diff --git a/README-chinese.md b/README-chinese.md
new file mode 100644
index 00000000..da7d4489
--- /dev/null
+++ b/README-chinese.md
@@ -0,0 +1,150 @@
+[](https://github.com/apache/rocketmq-streams/releases)
+[](https://www.apache.org/licenses/LICENSE-2.0.html)
+[](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue")
+[](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
+[](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
+
+# Features
+
+* 轻量级部署:可以单独部署,也支持集群部署
+* 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等
+
+# DataStream Example
+
+```java
+import org.apache.rocketmq.streams.client.transform.DataStream;
+
+DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
+
+ source
+ .fromFile("~/admin/data/text.txt",false)
+ .map(message->message)
+ .toPrint(1)
+ .start();
+```
+
+# Maven Repository
+
+```xml
+
+
+ org.apache.rocketmq
+ rocketmq-streams-clients
+ 1.0.2-SNAPSHOT
+
+```
+
+# Core API
+
+rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编写流计算的程序,实现自己的业务需求;
+
+## StreamBuilder
+
+StreamBuilder 用于构建流任务的源;
+
++ [dataStream(nameSpaceName,pipelineName)]() 返回DataStreamSource实例,用于分段编程实现流计算任务;
+
+## DataStream API
+
+### Source
+
+DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;
+
++ ```fromFile``` 从文件中读取数据, 该方法包含俩个参数
+ + ```filePath``` 文件路径,必填参数
+ + ```isJsonData``` 是否json数据, 非必填参数, 默认为```true```
+
+
++ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
+ + ```topic``` rocketmq消息队列的topic名称,必填参数
+ + ```groupName``` 消费者组的名称,必填参数
+ + ```isJson``` 是否json格式,非必填参数
+ + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
+
++ ```fromKafka``` 从kafka中获取数据,包含5个参数
+ + ```bootstrapserver``` kafka的bootstrapserver 地址,包括ip和端口,多个值以逗号分隔, 必填参数
+ + ```topic``` kafka的topic, 必填参数
+ + ```groupName``` 消费组, 必填参数
+ + ```isJson``` 是否json格式,非必填参数,默认为true
+ + ```maxThread``` 客户端最大线程数,非必填参数,默认为1
+
++ ```fromMqtt``` 从满足MQTT协议的终端读取数据, 满足边缘计算的场景,其中包含9个参数
+ + ```url``` mqtt broker的地址,必填参数
+ + ```clientId``` 客户端ID, 必填参数,相同的clientId有负载的作用
+ + ```topic``` topic信息, 必填参数
+ + ```username``` 用户名, 非必填,在mqtt端添加鉴权机制时使用
+ + ```password``` 密码,非必填参数,在mqtt端添加鉴权机制时使用
+ + ```cleanSession``` 是否清理session信息, 非必填,默认为true
+ + ```connectionTimeout``` 连接超时信息, 非必填,默认是10s
+ + ```aliveInterval``` 判断连接是否活跃的间隔时间,非必填,默认是60s
+ + ```automaticReconnect``` 连接断开后自动重连机制,非必填,默认是true
+
+
++ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源
+
+### transform
+
+transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括```DataStream```,```JoinStream```, ```SplitStream```,```WindowStream```等多个transform类;
+
+DataStream实现了一系列常见的流计算算子
+
++ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
++ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项
++ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream
++ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream
++ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
++ ```operate``` 对每个记录执行一次自定义的函数,返回一个新的DataStream
++ ```script``` 针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
++ ```toPrint``` 将结果在控制台打印,生成新的DataStream实例
++ ```toFile``` 将结果保存为文件,生成一个新的DataStream实例
++ ```toMqtt``` 将结果输出到满足mqtt协议的设备中,生成一个新的DataStream实例
++ ```toDB``` 将结果保存到数据库
++ ```toRocketmq``` 将结果输出到rocketmq
++ ```toKafka``` 将结果输出到kafka
++ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
++ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
+ + ```count``` 在窗口内计数
+ + ```min``` 获取窗口内统计值的最小值
+ + ```max``` 获取窗口内统计值得最大值
+ + ```avg``` 获取窗口内统计值的平均值
+ + ```sum``` 获取窗口内统计值的加和值
+ + ```reduce``` 在窗口内进行自定义的汇总运算
++ ```join``` 根据条件将俩个流进行内关联
++ ```leftJoin``` 根据条件将俩个流的数据进行左关联
++ ```dimJoin``` 根据条件将流与维表进行内关联,维表的数据可以来自于文件,也可以来自于数据库
++ ```dimLeftJoin``` 根据条件将流与维表进行左关联,维表的数据可以来自于文件,也可以来自于数据库
++ ```union``` 将俩个流进行合并
++ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
++ ```with``` with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等
+
+#### Strategy
+
+策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过```with```算子传入,可以同时传入多个策略类型;
+
+```java
+//指定checkpoint的存储策略
+source
+ .fromRocketmq("TSG_META_INFO","")
+ .map(message->message+"--")
+ .toPrint(1)
+ .with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
+ .start();
+```
+
+# 运行
+
+Rocketmq-Streams 作为典型的java应用,既可以集成在业务系统里运行,也可以作为一个独立的jar包来运行;
+
+首先对应用的源码进行编译
+
+```shell
+mvn -Prelease-all -DskipTests clean install -U
+```
+
+然后直接通过java指令来运行
+
+```shell
+ java -jar jarName mainClass
+```
+
+更多详细的案例可以看[这里](docs/SUMMARY.md)
\ No newline at end of file
diff --git a/README.md b/README.md
index e4205699..05c7e8d3 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,145 @@
-# Summary
+# RocketMQ Streams [](https://app.travis-ci.com/apache/rocketmq-streams) [](https://app.codecov.io/gh/apache/rocketmq-streams)
-* [Quick Start](quick\_start.md)
-* [创建实时任务数据源](stream\_source.md)
-* [创建实时任务数据输出](stream\_sink.md)
-* [数据处理逻辑](stream\_transform.md)
+[](https://github.com/apache/rocketmq-streams/releases)
+[](https://www.apache.org/licenses/LICENSE-2.0.html)
+[](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue")
+[](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
+[](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
+
+## [中文文档](./README-Chinese.md)
+
+## [Quick Start](./quick_start.md)
+
+## Features
+
+* Lightweight deployment: RocketMQ Streams can be deployed separately or in cluster mode.
+* Various types of data input and output: source supports RocketMQ while sink supports databases and RocketMQ, etc.
+
+## DataStream Example
+
+```java
+import org.apache.rocketmq.streams.client.transform.DataStream;
+
+DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
+ source
+ .fromFile("~/admin/data/text.txt",false)
+ .map(message->message)
+ .toPrint(1)
+ .start();
+```
+
+## Maven Repository
+
+```xml
+
+
+ org.apache.rocketmq
+ rocketmq-streams-clients
+ 1.0.2-SNAPSHOT
+
+```
+
+# Core API
+
+RocketMQ Streams implements a series of advanced APIs, allowing users to write stream computing programs conveniently and achieve their own business requirements.
+
+## StreamBuilder
+
+StreamBuilder is used to build the source of stream tasks. It contains two methods: ```dataStream()``` and ```tableStream()```, which return two sources, DataStreamSource and TableStreamSource, respectively.
+
++ [dataStream(nameSpaceName,pipelineName)]() returns an instance of DataStreamSource, used for segmented programming to achieve stream computing tasks.
++ [tableStream(nameSpaceName,pipelineName)]() returns an instance of TableStreamSource, used for script programming to achieve stream computing tasks.
+
+## DataStream API
+
+### Source
+
+DataStreamSource is a source class of segmented programming, used to interface with various data sources and obtain data from major message queues.
+
++ ```fromFile```: reads data from the file. This method contains two parameters:
+ + ```filePath```: specifies which file path to read. Required.
+ + ```isJsonData```: specifies whether data is in JSON format. Optional. Default value: ```true```.
+ + ```tags```: the tags for filtering messages used by the RocketMQ consumer. Optional.
+
+
++ ```fromRocketmq```: obtains data from RocketMQ, including four parameters:
+ + ```topic```: the topic name of RocketMQ. Required.
+ + ```groupName```: the name of the consumer group. Required.
+ + ```isJson```: specifies whether data is in JSON format. Optional.
+ + ```tags```: the tags for filtering messages used by the RocketMQ consumer. Optional.
+
++ ```fromKafka``` read data from the Kafka, including five parameters:
+ + ```bootstrapserver``` the Kafka bootstrap servers. Required.
+ + ```topic``` the topic name of Kafka. Required.
+ + ```groupName``` the name of the consumer group. Required.
+ + ```isJson``` specifies whether data is in JSON format. Optional.
+ + ```maxThread``` the number of the Kafka consumer max Threads.Optional.
+
++ ```fromMqtt``` reads data from MQTT service, including nine parameters:
+ + ```url``` the broker of the MQTT service. Required.
+ + ```clientId``` the client id. Required
+ + ```topic``` the name of the MQTT topic. Required.
+ + ```username``` username. Optional.
+ + ```password``` password. Optional.
+ + ```cleanSession``` specifies Whether to clear the session during the restart. Optional.
+ + ```connectionTimeout``` the connection timeout. Optional.
+ + ```aliveInterval``` Survival time interval. Optional.
+ + ```automaticReconnect``` specifies Whether to reconnect. Optional.
+
++ ```from```: custom data source. You can specify your own data source by implementing ISource interface.
+
+### transform
+
+transform allows the input source data to be modified during the stream calculation process for the next step; DataStream API includes ```DataStream```, ```JoinStream```, ```SplitStream```, ```WindowStream```, and many other transform
+classes.
+
+#### DataStream
+
+DataStream implements a series of common stream calculation operators as follows:
+
++ ```map```: returns a new DataStream by passing each record of the source to the **func** function.
++ ```flatmap```: similar to map. One input item corresponds to 0 or more output items.
++ ```filter```: returns a new DataStream based on the record of the source DataStream only when the ** func** function returns **true**.
++ ```forEach```: executes the **func** function once for each record and returns a new DataStream.
++ ```selectFields```: returns the corresponding field value for each record, and returns a new DataStream.
++ ```operate```: executes a custom function for each record and returns a new DataStream.
++ ```script```: executes a script for each recorded field, returns new fields, and generates a new DataStream.
++ ```toPrint```: prints the result on the console and generates a new DataStreamAction instance.
++ ```toFile```: saves the result as a file and generates a new DataStreamAction instance.
++ ```toDB```: saves the result to the database.
++ ```toRocketmq```: outputs the result to RocketMQ.
++ ```toKafka```: outputs the result to Kafka.
++ ```to```: outputs the result to the specified storage through the custom ISink interface.
++ ```window```: performs relevant statistical analysis in the window, generally used in conjunction with ```groupBy```. ```window()``` is used to define the size of the window, and ```groupBy( )``` used to define the main key of statistical
+ analysis. You can specify multiple main keys:
+ + ```count```: counts in the window.
+ + ```min```: gets the minimum of the statistical value in the window.
+ + ```max```: gets the maximum of the statistical value in the window.
+ + ```avg```: gets the average of the statistical values in the window.
+ + ```sum```: gets the sum of the statistical values in the window.
+ + ```reduce```: performs custom summary calculations in the window.
++ ```join```: associates the two streams or one stream and one physical table according to the conditions and merges them into a large stream for related calculations.
+ + ```dimJoin``` associate a stream with a physical table which can be a file or a db table, and all matching records are retained
+ + ```dimLeftJoin``` After a flow is associated with a physical table, all data of the flow is reserved and fields that do not match the physical table are left blank
+ + ```join```
+ + ```leftJoin```
++ ```union```: merges the two streams.
++ ```split```: splits a data stream into different data streams according to tags for downstream analysis and calculation.
++ ```with```: specifies related strategies during the calculation, including Checkpoint and state storage strategies, etc.
+
+# Strategy
+
+The Strategy mechanism is mainly used to control the underlying logic during the operation of the computing engine, such as the storage methods of Checkpoint and state etc. Subsequent controls for windows, dual-stream joins, and so on will
+be added. All control strategies are transmitted through the ```with``` operator. Multiple policy types can be transmitted at the same time.
+
+```java
+//Specify the storage strategy for Checkpoint.
+source
+ .fromRocketmq("TSG_META_INFO","")
+ .map(message->message+"--")
+ .toPrint(1)
+ .with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
+ .start();
+```
+
+——————————————
diff --git a/SUMMARY.md b/SUMMARY.md
deleted file mode 100644
index 2c991438..00000000
--- a/SUMMARY.md
+++ /dev/null
@@ -1,7 +0,0 @@
-# Table of contents
-
-* [Summary](README.md)
-* [快速开发](quick\_start.md)
-* [stream\_source](stream\_source.md)
-* [stream\_sink](stream\_sink.md)
-* [数据处理逻辑](stream\_transform.md)
diff --git a/docs/README.md b/docs/README.md
new file mode 100644
index 00000000..5282bb1d
--- /dev/null
+++ b/docs/README.md
@@ -0,0 +1,150 @@
+[](https://github.com/apache/rocketmq-streams/releases)
+[](https://www.apache.org/licenses/LICENSE-2.0.html)
+[](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue")
+[](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
+[](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
+
+# Features
+
+* 轻量级部署:可以单独部署,也支持集群部署
+* 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等
+
+# DataStream Example
+
+```java
+import org.apache.rocketmq.streams.client.transform.DataStream;
+
+DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
+
+ source
+ .fromFile("~/admin/data/text.txt",false)
+ .map(message->message)
+ .toPrint(1)
+ .start();
+```
+
+# Maven Repository
+
+```xml
+
+
+ org.apache.rocketmq
+ rocketmq-streams-clients
+ 1.0.0-SNAPSHOT
+
+```
+
+# Core API
+
+rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编写流计算的程序,实现自己的业务需求;
+
+## StreamBuilder
+
+StreamBuilder 用于构建流任务的源;
+
++ [dataStream(nameSpaceName,pipelineName)]() 返回DataStreamSource实例,用于分段编程实现流计算任务;
+
+## DataStream API
+
+### Source
+
+DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;
+
++ ```fromFile``` 从文件中读取数据, 该方法包含俩个参数
+ + ```filePath``` 文件路径,必填参数
+ + ```isJsonData``` 是否json数据, 非必填参数, 默认为```true```
+
+
++ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
+ + ```topic``` rocketmq消息队列的topic名称,必填参数
+ + ```groupName``` 消费者组的名称,必填参数
+ + ```isJson``` 是否json格式,非必填参数
+ + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
+
++ ```fromKafka``` 从kafka中获取数据,包含5个参数
+ + ```bootstrapserver``` kafka的bootstrapserver 地址,包括ip和端口,多个值以逗号分隔, 必填参数
+ + ```topic``` kafka的topic, 必填参数
+ + ```groupName``` 消费组, 必填参数
+ + ```isJson``` 是否json格式,非必填参数,默认为true
+ + ```maxThread``` 客户端最大线程数,非必填参数,默认为1
+
++ ```fromMqtt``` 从满足MQTT协议的终端读取数据, 满足边缘计算的场景,其中包含9个参数
+ + ```url``` mqtt broker的地址,必填参数
+ + ```clientId``` 客户端ID, 必填参数,相同的clientId有负载的作用
+ + ```topic``` topic信息, 必填参数
+ + ```username``` 用户名, 非必填,在mqtt端添加鉴权机制时使用
+ + ```password``` 密码,非必填参数,在mqtt端添加鉴权机制时使用
+ + ```cleanSession``` 是否清理session信息, 非必填,默认为true
+ + ```connectionTimeout``` 连接超时信息, 非必填,默认是10s
+ + ```aliveInterval``` 判断连接是否活跃的间隔时间,非必填,默认是60s
+ + ```automaticReconnect``` 连接断开后自动重连机制,非必填,默认是true
+
+
++ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源
+
+### transform
+
+transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括```DataStream```,```JoinStream```, ```SplitStream```,```WindowStream```等多个transform类;
+
+DataStream实现了一系列常见的流计算算子
+
++ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
++ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项
++ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream
++ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream
++ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
++ ```operate``` 对每个记录执行一次自定义的函数,返回一个新的DataStream
++ ```script``` 针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
++ ```toPrint``` 将结果在控制台打印,生成新的DataStream实例
++ ```toFile``` 将结果保存为文件,生成一个新的DataStream实例
++ ```toMqtt``` 将结果输出到满足mqtt协议的设备中,生成一个新的DataStream实例
++ ```toDB``` 将结果保存到数据库
++ ```toRocketmq``` 将结果输出到rocketmq
++ ```toKafka``` 将结果输出到kafka
++ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
++ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
+ + ```count``` 在窗口内计数
+ + ```min``` 获取窗口内统计值的最小值
+ + ```max``` 获取窗口内统计值得最大值
+ + ```avg``` 获取窗口内统计值的平均值
+ + ```sum``` 获取窗口内统计值的加和值
+ + ```reduce``` 在窗口内进行自定义的汇总运算
++ ```join``` 根据条件将俩个流进行内关联
++ ```leftJoin``` 根据条件将俩个流的数据进行左关联
++ ```dimJoin``` 根据条件将流与维表进行内关联,维表的数据可以来自于文件,也可以来自于数据库
++ ```dimLeftJoin``` 根据条件将流与维表进行左关联,维表的数据可以来自于文件,也可以来自于数据库
++ ```union``` 将俩个流进行合并
++ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
++ ```with``` with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等
+
+#### Strategy
+
+策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过```with```算子传入,可以同时传入多个策略类型;
+
+```java
+//指定checkpoint的存储策略
+source
+ .fromRocketmq("TSG_META_INFO","")
+ .map(message->message+"--")
+ .toPrint(1)
+ .with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
+ .start();
+```
+
+# 运行
+
+Rocketmq-Streams 作为典型的java应用,既可以集成在业务系统里运行,也可以作为一个独立的jar包来运行;
+
+首先对应用的源码进行编译
+
+```shell
+mvn -Prelease-all -DskipTests clean install -U
+```
+
+然后直接通过java指令来运行
+
+```shell
+ java -jar jarName mainClass
+```
+
+更多详细的案例可以看[这里](docs/SUMMARY.md)
\ No newline at end of file
diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md
new file mode 100644
index 00000000..950f99bd
--- /dev/null
+++ b/docs/SUMMARY.md
@@ -0,0 +1,8 @@
+# Summary
+
+* [Introduction](README.md)
+* [Quick Start](quick_start/README.md)
+* [创建实时任务数据源](stream_source/README.md)
+* [创建实时任务数据输出](stream_sink/README.md)
+* [数据处理逻辑](stream_transform/README.md)
+
diff --git a/docs/quick_start/README.md b/docs/quick_start/README.md
new file mode 100644
index 00000000..a60dbb95
--- /dev/null
+++ b/docs/quick_start/README.md
@@ -0,0 +1,46 @@
+# 快速开发
+
+## 引入相关的jar包
+
+```xml
+
+
+ org.apache.rocketmq
+ rocketmq-streams-clients
+
+
+```
+
+## 开发实时应用程序
+
+```java
+
+public class RocketmqExample {
+
+ public static void main(String[] args) {
+
+ DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline");
+
+ dataStream
+ .fromFile("data.csv", false) //构建实时任务的数据源
+ .map(message -> message.split(",")) //构建实时任务处理的逻辑过程
+ .toPrint(1) //构建实时任务的输出
+ .start(); //启动实时任务
+ }
+}
+
+```
+
+## 运行
+
+打包
+
+```shell
+mvn -Prelease-all -DskipTests clean install -U
+```
+
+运行
+
+```shell
+ java -jar jarName mainClass
+```
diff --git a/stream_sink.md b/docs/stream_sink/README.md
similarity index 99%
rename from stream_sink.md
rename to docs/stream_sink/README.md
index d7718865..fd560aa1 100644
--- a/stream_sink.md
+++ b/docs/stream_sink/README.md
@@ -80,14 +80,15 @@
DataStream dataStream=dataStream.toRocketmq(topic,tags,groupName,namesrvAddress);
```
-## kafka
+
+##kafka
+
```java
String bootstrapServers = ......;//kafka的bootstrap server
String topic = ......; //kafka的topic
DataStream dataStream = dataStream.toKafka(bootstrapServers, topic);
```
-
# MQTT协议
```java
diff --git a/stream_source.md b/docs/stream_source/README.md
similarity index 99%
rename from stream_source.md
rename to docs/stream_source/README.md
index 8ae7b7e4..13831285 100644
--- a/stream_source.md
+++ b/docs/stream_source/README.md
@@ -17,6 +17,7 @@
String filepath = .....; //文件路径
DataStream dataStream = dataStreamSource.fromFile(filePath);
```
+
或者
```java
@@ -25,7 +26,6 @@
DataStream dataStream = dataStreamSource.fromFile(filePath, isJsonData);
```
-
## Rocketmq
```java
@@ -36,6 +36,7 @@
DataStream dataStream = dataStreamSource.fromRocketmq(topic, groupName, namesrvAddress);
```
+
或者
```java
@@ -47,7 +48,9 @@
DataStream dataStream = dataStreamSource.fromRocketmq(topic, groupName, isJsonData, namesrvAddress);
```
+
或者
+
```java
String topic = .....; //rocketmq 的topic
@@ -59,7 +62,8 @@
```
-## kafka
+##kafka
+
```java
String bootstrapServers = ......;//kafka的bootstrap server
String topic = ......; //kafka的topic
@@ -68,7 +72,9 @@
Integer maxThread = 1; //客户端线程数
DataStream dataStream = dataStreamSource.fromKafka(bootstrapServers, topic, groupName, isJsonData, maxThread);
```
+
或者
+
```java
String bootstrapServers = ......;//kafka的bootstrap server
String topic = ......; //kafka的topic
@@ -76,7 +82,9 @@
Boolean isJsonData = true; //是否json
DataStream dataStream = dataStreamSource.fromKafka(bootstrapServers, topic, groupName, isJsonData);
```
+
或者
+
```java
String bootstrapServers = ......;//kafka的bootstrap server
String topic = ......; //kafka的topic
@@ -84,10 +92,8 @@
DataStream dataStream = dataStreamSource.fromKafka(bootstrapServers, topic, groupName);
```
-
-
-
# MQTT协议
+
```java
String url = ......;
@@ -96,7 +102,9 @@
DataStream dataStream = dataStreamSource.fromMqtt(url, cliientId, topic);
```
+
或者
+
```java
String url = ......;
@@ -107,7 +115,9 @@
DataStream dataStream = dataStreamSource.fromMqtt(url, cliientId, topic, username, password);
```
+
或者
+
````java
String url = ......;
@@ -124,6 +134,7 @@
````
##自定义Source
+
````java
DataStream dataStream = dataStreamSource.from(new ISource() {});
````
diff --git a/stream_transform.md b/docs/stream_transform/README.md
similarity index 100%
rename from stream_transform.md
rename to docs/stream_transform/README.md
diff --git a/pom.xml b/pom.xml
index b2792adc..200b6e32 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,4 +1,4 @@
-
+
-
4.0.0
@@ -26,9 +26,10 @@
18
+
org.apache.rocketmq
rocketmq-streams
- 1.0.2-SNAPSHOT
+ 2.0.1-SNAPSHOT
Apache RocketMQ Streams ${project.version}
pom
https://rocketmq.apache.org/
@@ -38,27 +39,35 @@
rocketmq-streams-dim
rocketmq-streams-transport-minio
rocketmq-streams-script
- rocketmq-streams-configurable
rocketmq-streams-serviceloader
rocketmq-streams-filter
rocketmq-streams-schedule
rocketmq-streams-lease
rocketmq-streams-db-operator
- rocketmq-streams-dbinit
+ rocketmq-streams-db-init
rocketmq-streams-window
+ rocketmq-streams-channel-metaq
rocketmq-streams-clients
rocketmq-streams-channel-rocketmq
rocketmq-streams-channel-db
rocketmq-streams-channel-http
rocketmq-streams-state
rocketmq-streams-examples
- rocketmq-streams-checkpoint
rocketmq-streams-connectors
rocketmq-streams-channel-syslog
rocketmq-streams-channel-es
rocketmq-streams-channel-kafka
rocketmq-streams-channel-mqtt
- rocketmq-streams-cep
+ rocketmq-streams-channel-sls
+ rocketmq-streams-channel-openapi
+ rocketmq-streams-channel-view
+ rocketmq-streams-channel-tencent
+ rocketmq-streams-dispatcher
+ rocketmq-streams-tasks
+ rocketmq-streams-sts
+ rocketmq-streams-storage
+ rocketmq-streams-local-cache
+ rocketmq-streams-channel-huawei
@@ -67,42 +76,54 @@
1.13.0
2.11
false
- 1.8
- 1.8
- 1.8
+ 11
+ 11
+ 11
UTF-8
${java.encoding}
- 1.1
+
5.1.14.RELEASE
1.0-rc5
- 5.1.40
- 1.2.25
- 2.2.1
+ 8.0.31
+ 1.2.83
+ 2.3.2
4.5.13
+ 1.2
2.7
+ 3.2.2
+ 3.12.0
+ 1.15
4.13.1
- 25.1-jre
+ 31.1-jre
2.1.8
3.2.0
- 6.6.4
- 4.9.3
+ 8.3.2
+ 5.0.0
5.4.0-2.0.0
3.5.2
2.8.5
0.1.9
2.7.0
2.12.4
- 1.2.3
3.0.10
- 6.6.4
- 3.2.1
- 2.5
7.4.0
1.1.0
1.2.2
5.3.0
2.56
+ 1.6
1.7.36
+ 1.7.36
+ 1.6.0
+ 2.5.0
+ 0.6.74
+ 0.3.11
+ 0.6.33
+ 3.2.0
+ 1.2.8
+ 1.0.0
+ 4.5.1
+ 3.10.2
@@ -128,7 +149,10 @@
build_without_test.sh
NOTICE
LICENSE
- *.md
+ README.md
+ README-chinese.md
+ QUICKSTART.md
+ quick_start.md
.github/**
*/target/**
*/*.iml
@@ -199,17 +223,61 @@
-
+
+ org.apache.rocketmq
+ rocketmq-streams-channel-tencentcloudapi
+ ${project.version}
+
+
+ org.apache.rocketmq
+ rocketmq-streams-ocsf
+ ${project.version}
+
+
+ org.apache.rocketmq
+ rocketmq-streams-shuffle-store
+ ${project.version}
+
+
+ org.apache.rocketmq
+ rocketmq-streams-local-cache
+ ${project.version}
+
+
+ org.apache.rocketmq
+ rocketmq-streams-dispatcher
+ ${project.version}
+
org.apache.rocketmq
rocketmq-streams-commons
${project.version}
+
+ org.apache.rocketmq
+ rocketmq-streams-channel-metaq
+ ${project.version}
+
+
+ org.apache.rocketmq
+ rocketmq-streams-channel-view
+ ${project.version}
+
+
+ org.apache.rocketmq
+ rocketmq-streams-sts
+ ${project.version}
+
org.apache.rocketmq
rocketmq-streams-channel-es
${project.version}
+
+ org.apache.rocketmq
+ rocketmq-streams-tasks
+ ${project.version}
+
org.apache.rocketmq
rocketmq-streams-channel-mqtt
@@ -217,7 +285,22 @@
org.apache.rocketmq
- rocketmq-streams-dbinit
+ rocketmq-streams-db-init
+ ${project.version}
+
+
+ org.apache.rocketmq
+ rocketmq-streams-channel-openapi
+ ${project.version}
+
+
+ org.apache.rocketmq
+ rocketmq-streams-channel-tencent
+ ${project.version}
+
+
+ org.apache.rocketmq
+ rocketmq-streams-channel-huawei
${project.version}
@@ -244,16 +327,6 @@
org.apache.rocketmq
rocketmq-streams-configurable
${project.version}
-
-
- ch.qos.logback
- logback-classic
-
-
- ch.qos.logback
- logback-core
-
-
org.apache.rocketmq
@@ -284,22 +357,17 @@
org.apache.rocketmq
rocketmq-streams-script
${project.version}
-
-
- ch.qos.logback
- logback-classic
-
-
- ch.qos.logback
- logback-core
-
-
org.apache.rocketmq
rocketmq-streams-script-python
${project.version}
+
+ org.apache.rocketmq
+ rocketmq-streams-channel-rocketmq
+ ${project.version}
+
org.apache.rocketmq
rocketmq-streams-serviceloader
@@ -327,12 +395,12 @@
org.apache.rocketmq
- rocketmq-streams-channel-rocketmq
+ rocketmq-streams-channel-kafka
${project.version}
org.apache.rocketmq
- rocketmq-streams-channel-kafka
+ rocketmq-streams-channel-sls
${project.version}
@@ -345,24 +413,20 @@
rocketmq-streams-examples
${project.version}
-
-
-
-
-
org.apache.rocketmq
- rocketmq-tools
- ${rocketmq.version}
+ rocketmq-streams-localcache
+ ${project.version}
+
org.apache.rocketmq
- rocketmq-common
+ rocketmq-client
${rocketmq.version}
org.apache.rocketmq
- rocketmq-client
+ rocketmq-tools
${rocketmq.version}
@@ -370,45 +434,85 @@
rocketmq-acl
${rocketmq.version}
-
+
+ com.taobao.metaq.final
+ metaq-client
+ 4.0.2.Final
+
+
+ ch.qos.logback
+ logback-classic
+
+
+ ch.qos.logback
+ logback-core
+
+
+
+
+ com.alibaba.rocketmq
+ rocketmq-common
+ 4.0.2
+
-
+
-
- com.alibaba
- fastjson
- ${fastjson.version}
+ commons-io
+ commons-io
+ ${commons-io.version}
-
- junit
- junit
- ${junit.version}
+ commons-codec
+ commons-codec
+ ${commons-codec.version}
-
- commons-logging
- commons-logging
- ${commons-logging.version}
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3.version}
- org.apache.rocketmq
- commons
- ${project.version}
+ commons-collections
+ commons-collections
+ ${commons-collections.version}
+
+
+
+
+
- commons-io
- commons-io
- ${commons-io.version}
+ org.slf4j
+ slf4j-api
+ ${slf4j-api.version}
-
org.slf4j
slf4j-log4j12
${slf4j-log4j12.version}
+
+ org.elasticsearch.client
+ elasticsearch-rest-high-level-client
+ ${elasticsearch.version}
+
+
+ org.elasticsearch.client
+ elasticsearch-rest-client
+ ${elasticsearch.version}
+
+
+
+ org.elasticsearch
+ elasticsearch
+ ${elasticsearch.version}
+
+
+
+
+
com.google.code.gson
gson
@@ -433,6 +537,25 @@
+
+ com.google.re2j
+ re2j
+ ${re2j.version}
+
+
+
+
+ com.alibaba
+ fastjson
+ ${fastjson.version}
+
+
+
+ junit
+ junit
+ ${junit.version}
+
+
com.lmax
disruptor
@@ -513,22 +636,11 @@
-
- ch.qos.logback
- logback-core
- ${logback-core.version}
-
-
- ch.qos.logback
- logback-classic
- ${logback-core.version}
-
io.minio
minio
${minio.version}
-
org.rocksdb
rocksdbjni
@@ -540,53 +652,88 @@
0.9.48
- commons-collections
- commons-collections
- ${commons-collections.version}
-
-
- commons-lang
- commons-lang
- ${commons-lang.version}
+ org.apache.kafka
+ kafka_2.12
+ ${kafka.version}
- org.elasticsearch.client
- elasticsearch-rest-high-level-client
- ${elasticsearch.version}
+ de.ruedigermoeller
+ fst
+ ${fst.version}
- org.elasticsearch.client
- elasticsearch-rest-client
- ${elasticsearch.version}
+ com.esotericsoftware
+ kryo
+ ${kryo.version}
-
- org.elasticsearch
- elasticsearch
- ${elasticsearch.version}
+ net.agkn
+ hll
+ ${hll.version}
-
- org.apache.kafka
- kafka_2.12
- ${kafka.version}
-
-
org.eclipse.paho
org.eclipse.paho.client.mqttv3
${paho.version}
-
+
- de.ruedigermoeller
- fst
- ${fst.version}
+ com.google.protobuf
+ protobuf-java
+ ${protobuf.version}
- com.esotericsoftware
- kryo
- ${kryo.version}
+ com.aliyun.openservices
+ aliyun-log
+ ${aliyun-log.version}
+
+
+ com.aliyun.openservices
+ aliyun-log-producer
+ ${aliyun-log-producer.version}
+
+
+ com.aliyun.openservices
+ loghub-client-lib
+ ${loghub.version}
+
+
+
+ com.aliyun
+ aliyun-java-sdk-sts-internal
+ ${sts.version}
+
+
+ com.aliyun.ram
+ ram-auth-client
+ ${ram-auth-client.version}
+
+
+ com.aliyun
+ aliyun-java-sdk-ram-inner
+ ${ram-inner.version}
+
+
+ com.aliyun
+ aliyun-java-sdk-core
+ ${aliyun-java-sdk-core.version}
+
+
+ com.univocity
+ univocity-parsers
+ 2.9.1
+
+
+ com.aliyun.oss
+ aliyun-sdk-oss
+ ${oss.version}
+
+
+
+ org.springframework.boot
+ spring-boot-loader
+ 2.5.12
diff --git a/quick_start.md b/quick_start.md
index a60dbb95..adcb529d 100644
--- a/quick_start.md
+++ b/quick_start.md
@@ -1,46 +1,84 @@
-# 快速开发
+## rocketmq-streams 快速搭建
+---
-## 引入相关的jar包
+### 前言
-```xml
+本文档主要介绍如何基于rocketmq-streams快速搭建流处理任务,搭建过程中某些例子会用到rocketmq,可以参考[rocketmq搭建文档](https://rocketmq.apache.org/docs/quick-start/)
-
- org.apache.rocketmq
- rocketmq-streams-clients
-
+### 1、源码构建
-```
+#### 1.1、构建环境
-## 开发实时应用程序
+- JDK 1.8 and above
+- Maven 3.2 and above
-```java
+#### 1.2、构建Rocketmq-streams
-public class RocketmqExample {
+```shell
+git clone https://github.com/apache/rocketmq-streams.git
+cd rocketmq-streams
+mvn clean -DskipTests install -U
- public static void main(String[] args) {
+```
- DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline");
+### 2、基于rocketmq-streams创建应用
- dataStream
- .fromFile("data.csv", false) //构建实时任务的数据源
- .map(message -> message.split(",")) //构建实时任务处理的逻辑过程
- .toPrint(1) //构建实时任务的输出
- .start(); //启动实时任务
- }
-}
+#### 2.1、pom依赖
+```xml
+
+
+ org.apache.rocketmq
+ rocketmq-streams-clients
+
```
-## 运行
+#### 2.2、shade clients依赖包
-打包
+```xml
-```shell
-mvn -Prelease-all -DskipTests clean install -U
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.1
+
+
+ package
+
+ shade
+
+
+ false
+ true
+
+
+ org.apache.rocketmq:rocketmq-streams-clients
+
+
+
+
+
+
+
+
```
-运行
+#### 2.3、编写业务代码
+
+快速编写一个统计页面点击次数的小程序:Please see the [rocketmq-streams-examples](rocketmq-streams-examples/README.md)
+
+#### 2.4、运行
+
+- 前提:在从rocketmq中读取数据做流处理时,需要运行topic在rocketmq中自动创建,因为做groupBy操作时,需要用到rocketmq作为shuffle数据的读写目的地。
+- 命令:
-```shell
- java -jar jarName mainClass
```
+ java -jar XXXX-shade.jar \
+ -Dlog4j.level=ERROR \
+ -Dlog4j.home=/logs \
+ -Xms1024m \
+ -Xmx1024m
+```
+
diff --git a/rocketmq-streams-channel-db/pom.xml b/rocketmq-streams-channel-db/pom.xml
index b31b7110..7a5ac938 100755
--- a/rocketmq-streams-channel-db/pom.xml
+++ b/rocketmq-streams-channel-db/pom.xml
@@ -15,14 +15,14 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-
4.0.0
org.apache.rocketmq
rocketmq-streams
- 1.0.2-SNAPSHOT
+ 2.0.1-SNAPSHOT
rocketmq-streams-channel-db
ROCKETMQ STREAMS :: channel-db
@@ -30,7 +30,8 @@
org.apache.rocketmq
- rocketmq-streams-db-operator
+ rocketmq-streams-connectors
+
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/CycleSplit.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/CycleSplit.java
index 706f07e1..4c9ac5a4 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/CycleSplit.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/CycleSplit.java
@@ -26,11 +26,11 @@ public class CycleSplit extends DynamicMultipleDBSplit implements Serializable {
private static final long serialVersionUID = 4309494143340650195L;
String cyclePeriod;
- public CycleSplit(){
+ public CycleSplit() {
}
- public CycleSplit(String version){
+ public CycleSplit(String version) {
this.cyclePeriod = version;
}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/DynamicMultipleDBSplit.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/DynamicMultipleDBSplit.java
index bf3662ea..32478373 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/DynamicMultipleDBSplit.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/DynamicMultipleDBSplit.java
@@ -56,7 +56,6 @@ public String getQueueId() {
return logicTableName + "_" + suffix;
}
-
@Override
public String getQueue() {
return logicTableName + "_" + suffix;
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
index 24b4ad71..4c9bd469 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
@@ -24,14 +24,12 @@
import java.sql.ResultSet;
import java.util.List;
import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.IChannel;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
-import org.apache.rocketmq.streams.common.component.AbstractComponent;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+import org.apache.rocketmq.streams.common.configuration.ConfigurationKey;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;
@@ -41,35 +39,32 @@
import org.apache.rocketmq.streams.db.driver.DriverBuilder;
import org.apache.rocketmq.streams.db.driver.JDBCDriver;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* 主要用于写db,输入可以是一个insert/replace 模版,也可以是metadata对象,二者选一即可。都支持批量插入,提高吞吐 sql 模版:insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}') MetaData:主要是描述每个字段的类型,是否必须 二者选一个即可。sql模式,系统会把一批(batchSize)数据拼成一个大sql。metadata模式,基于字段描述,最终也是拼成一个大sql
*/
public class DBSink extends AbstractSink {
- static final Log logger = LogFactory.getLog(DBSink.class);
-
public static final String SQL_MODE_DEFAULT = "default";
public static final String SQL_MODE_REPLACE = "replace";
public static final String SQL_MODE_IGNORE = "ignore";
-
- @ENVDependence protected String jdbcDriver = AbstractComponent.DEFAULT_JDBC_DRIVER;
+ static final Logger LOGGER = LoggerFactory.getLogger(DBSink.class);
+ /**
+ * 解析出insert value数据部分,对于批量的插入,效果会更佳
+ */
+ private static final String VALUES_NAME = "values";
+ @ENVDependence protected String jdbcDriver = ConfigurationKey.DEFAULT_JDBC_DRIVER;
@ENVDependence protected String url;
@ENVDependence protected String userName;
@ENVDependence protected String tableName; //指定要插入的数据表
@ENVDependence protected String password;
@ENVDependence protected String sqlMode;
-
- protected MetaData metaData;//可以指定meta data,和insertSQL二选一
-
protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')
-
protected boolean openSqlCache = true;
-
protected transient IMessageCache sqlCache;//cache sql, batch submit sql
-
boolean isMultiple = false; //是否多表
-
/**
* db串多数是名字,可以取个名字前缀,如果值为空,默认为此类的name,name为空,默认为简单类名
*
@@ -79,7 +74,7 @@ public class DBSink extends AbstractSink {
public DBSink(String insertSQL, String dbInfoNamePrefix) {
setType(IChannel.TYPE);
if (StringUtil.isEmpty(dbInfoNamePrefix)) {
- dbInfoNamePrefix = getConfigureName();
+ dbInfoNamePrefix = getName();
}
if (StringUtil.isEmpty(dbInfoNamePrefix)) {
dbInfoNamePrefix = this.getClass().getSimpleName();
@@ -115,7 +110,7 @@ public DBSink(String url, String userName, String password, String tableName, St
@Override protected boolean initConfigurable() {
if (this.metaData == null) {
try {
- Class.forName("com.mysql.jdbc.Driver");
+ Class.forName(this.jdbcDriver);
if (StringUtil.isNotEmpty(this.tableName)) {
Connection connection = DriverManager.getConnection(this.url, this.userName, this.password);
DatabaseMetaData connectionMetaData = connection.getMetaData();
@@ -131,6 +126,7 @@ public DBSink(String url, String userName, String password, String tableName, St
List insertFields = Lists.newArrayList();
List insertValues = Lists.newArrayList();
List duplicateKeys = Lists.newArrayList();
+
fieldList.forEach(field -> {
String fieldName = field.getFieldName();
insertFields.add("`" + fieldName + "`");
@@ -154,7 +150,7 @@ public DBSink(String url, String userName, String password, String tableName, St
this.sqlCache = new MessageCache<>(sqls -> {
JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password);
try {
- dataSource.executSqls(sqls);
+ dataSource.executeSqls(sqls);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
@@ -224,15 +220,10 @@ protected void executeSQL(JDBCDriver dbDataSource, List sqls) {
this.sqlCache.addCache(sql);
}
} else {
- dbDataSource.executSqls(sqls);
+ dbDataSource.executeSqls(sqls);
}
}
- /**
- * 解析出insert value数据部分,对于批量的插入,效果会更佳
- */
- private static final String VALUES_NAME = "values";
-
protected String parseInsertValues(String insertSQL) {
int start = insertSQL.toLowerCase().indexOf(VALUES_NAME);
if (start == -1) {
@@ -306,14 +297,6 @@ public void setSqlMode(String sqlMode) {
this.sqlMode = sqlMode;
}
- public MetaData getMetaData() {
- return metaData;
- }
-
- public void setMetaData(MetaData metaData) {
- this.metaData = metaData;
- }
-
public boolean isOpenSqlCache() {
return openSqlCache;
}
@@ -352,11 +335,11 @@ private final String getCreateTableSqlFromOther(String sourceTableName, String t
String createTableSql = MetaDataUtils.getCreateTableSqlByTableName(url, userName, password, sourceTableName);
if (createTableSql == null) {
String errMsg = String.format("source table is not exist. multiple db sink must be dependency logic table meta for auto create sub table. logic table name is ", sourceTableName);
- logger.error(errMsg);
+ LOGGER.error(errMsg);
throw new RuntimeException(errMsg);
}
createTableSql = createTableSql.replace(sourceTableName, targetTableName);
- logger.info(String.format("createTableSql is %s", createTableSql));
+ LOGGER.info(String.format("createTableSql is %s", createTableSql));
return createTableSql;
}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
index 0c36b629..53ae49d9 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
@@ -24,6 +24,8 @@
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.model.ServiceName;
+import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
+import org.apache.rocketmq.streams.db.source.DBSource;
@AutoService(IChannelBuilder.class)
@ServiceName(value = DBSinkBuilder.TYPE, aliasName = "db")
@@ -32,15 +34,8 @@ public class DBSinkBuilder implements IChannelBuilder {
@Override
public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
- DBSink sink = new DBSink();
- JSONObject proJson = createFormatProperty(properties);
- sink.setUrl(proJson.getString("url"));
- sink.setUserName(proJson.getString("userName"));
- sink.setPassword(proJson.getString("password"));
- sink.setTableName(proJson.getString("tableName"));
- sink.setSqlMode(proJson.getString("sqlMode"));
- sink.setMetaData(metaData);
- return sink;
+ return (ISink) ConfigurableUtil.create(DBSink.class.getName(), namespace, name, createFormatProperty(properties), null);
+
}
/**
@@ -66,7 +61,7 @@ protected JSONObject createFormatProperty(Properties properties) {
@Override
public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
- throw new RuntimeException("can not support this method");
+ return (ISource) ConfigurableUtil.create(DBSource.class.getName(), namespace, name, createFormatProperty(properties), null);
}
@Override
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DynamicMultipleDBSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DynamicMultipleDBSink.java
index d57fe015..c1a24aee 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DynamicMultipleDBSink.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DynamicMultipleDBSink.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.streams.db.sink;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
-import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
-import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.functions.MultiTableSplitFunction;
import org.apache.rocketmq.streams.db.DynamicMultipleDBSplit;
@@ -26,7 +24,7 @@
/**
* @description
*/
-public class DynamicMultipleDBSink extends AbstractMultiTableSink implements IAfterConfigurableRefreshListener {
+public class DynamicMultipleDBSink extends AbstractMultiTableSink {
private static final long serialVersionUID = -4570659943689358381L;
String logicTableName;
@@ -35,6 +33,12 @@ public class DynamicMultipleDBSink extends AbstractMultiTableSink implements IAf
public DynamicMultipleDBSink() {
}
+ public DynamicMultipleDBSink(String url, String userName, String password, String logicTableName, String fieldName) {
+ super(url, userName, password);
+ this.logicTableName = logicTableName;
+ this.fieldName = fieldName;
+ }
+
public String getLogicTableName() {
return logicTableName;
}
@@ -51,12 +55,6 @@ public void setFieldName(String fieldName) {
this.fieldName = fieldName;
}
- public DynamicMultipleDBSink(String url, String userName, String password, String logicTableName, String fieldName) {
- super(url, userName, password);
- this.logicTableName = logicTableName;
- this.fieldName = fieldName;
- }
-
@Override
protected String createTableName(String splitId) {
return this.multiTableSplitFunction.createTableFromSplitId(splitId);
@@ -68,7 +66,7 @@ protected String createTableName(String splitId) {
}
@Override
- public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) {
+ protected boolean initConfigurable() {
if (this.multiTableSplitFunction == null) {
@@ -85,6 +83,7 @@ public String createTableFromSplitId(String splitId) {
};
}
+ return true;
}
}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/EnhanceDBSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/EnhanceDBSink.java
index d7897032..5dad2d0f 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/EnhanceDBSink.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/EnhanceDBSink.java
@@ -21,17 +21,13 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.IChannel;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.ChangeTableNameMessage;
-import org.apache.rocketmq.streams.common.component.AbstractComponent;
-import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.configuration.ConfigurationKey;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
import org.apache.rocketmq.streams.common.metadata.MetaData;
@@ -41,27 +37,25 @@
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
import org.apache.rocketmq.streams.db.sink.sqltemplate.ISqlTemplate;
import org.apache.rocketmq.streams.db.sink.sqltemplate.SqlTemplateFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @description enhance db sink, support atomic sink and multiple sink
*/
public class EnhanceDBSink extends AbstractSink {
- static final Log logger = LogFactory.getLog(EnhanceDBSink.class);
+ static final Logger LOGGER = LoggerFactory.getLogger(EnhanceDBSink.class);
protected MetaData metaData;//可以指定meta data,和insertSQL二选一
protected String tableName; //指定要插入的数据表
- boolean isAtomic = false; //是否原子写入
- boolean isMultiple = false; //是否多表
protected boolean isContainsId = false;
protected boolean openSqlCache = true;
-
/**
* for atomic sink. default is null
*/
protected String tmpTableName;
-
@ENVDependence
- protected String jdbcDriver = AbstractComponent.DEFAULT_JDBC_DRIVER;
+ protected String jdbcDriver = ConfigurationKey.DEFAULT_JDBC_DRIVER;
@ENVDependence
protected String url;
@ENVDependence
@@ -70,10 +64,10 @@ public class EnhanceDBSink extends AbstractSink {
protected String password;
@ENVDependence
protected String sqlMode;
-
protected transient IMessageCache sqlCache;//cache sql, batch submit sql
-
protected transient ISqlTemplate iSqlTemplate;
+ boolean isAtomic = false; //是否原子写入
+ boolean isMultiple = false; //是否多表
public EnhanceDBSink() {
this(null, null, null, null);
@@ -100,47 +94,45 @@ public EnhanceDBSink(String url, String userName, String password, String tableN
@Override
protected boolean initConfigurable() {
- if(isAtomic && isMultiple){
+ if (isAtomic && isMultiple) {
String errMsg = String.format("atomic is not support multiple.");
- logger.error(errMsg);
+ LOGGER.error(errMsg);
throw new RuntimeException(errMsg);
}
//如果是多表, 根据逻辑表名创建分区表
- if(isMultiple){
+ if (isMultiple) {
createMultiTable();
}
//如果是原子写入,根据结果表创建临时表
- if(isAtomic){
+ if (isAtomic) {
createTmpTable();
}
//如果未设置metadata, 则从db搜索元数据, 创建metadata
- if(metaData == null){
+ if (metaData == null) {
createMetaData();
}
- if(iSqlTemplate == null){
+ if (iSqlTemplate == null) {
try {
iSqlTemplate = SqlTemplateFactory.newSqlTemplate(sqlMode, metaData, isContainsId);
} catch (Exception e) {
- e.printStackTrace();
- logger.error(e);
+ LOGGER.error("get sql template error", e);
}
}
- if(openSqlCache){
+ if (openSqlCache) {
initSqlCache();
}
return super.initConfigurable();
}
-
- private void initSqlCache(){
+ private void initSqlCache() {
this.sqlCache = new MessageCache<>(sqls -> {
JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password);
try {
- dataSource.executSqls(sqls);
+ dataSource.executeSqls(sqls);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
@@ -157,53 +149,52 @@ private void initSqlCache(){
/**
* create
*/
- private void createMetaData(){
+ private void createMetaData() {
String realInsertTableName = isAtomic ? tmpTableName : tableName;
metaData = MetaDataUtils.createMetaData(url, userName, password, realInsertTableName);
}
- private void createMultiTable(){
+ private void createMultiTable() {
String logicTable = subStrLogicTableName(tableName);
copyAndCreateTableSchema(logicTable, tableName);
}
- private void createTmpTable(){
+ private void createTmpTable() {
String tmpTable = createTmpTableName(tableName);
copyAndCreateTableSchema(tableName, tmpTable);
}
- private void copyAndCreateTableSchema(String sourceTableName, String targetTableName){
+ private void copyAndCreateTableSchema(String sourceTableName, String targetTableName) {
List tables = MetaDataUtils.listTableNameByPattern(url, userName, password, targetTableName);
- if(tables == null || tables.size() == 0){
+ if (tables == null || tables.size() == 0) {
String createTableSql = getCreateTableSqlFromOther(sourceTableName, tableName);
createTable(createTableSql);
}
}
- private final String getCreateTableSqlFromOther(String sourceTableName, String targetTableName){
+ private final String getCreateTableSqlFromOther(String sourceTableName, String targetTableName) {
String createTableSql = MetaDataUtils.getCreateTableSqlByTableName(url, userName, password, sourceTableName);
createTableSql = createTableSql.replace(sourceTableName, targetTableName);
- logger.info(String.format("createTableSql is %s", createTableSql));
+ LOGGER.info(String.format("createTableSql is %s", createTableSql));
return createTableSql;
}
- private final String subStrLogicTableName(String realTableName){
+ private final String subStrLogicTableName(String realTableName) {
int len = realTableName.lastIndexOf("_");
String logicTableName = realTableName.substring(0, len);
return logicTableName;
}
- private final String createTmpTableName(String tableName){
+ private final String createTmpTableName(String tableName) {
return "tmp" + "_" + tableName;
}
/**
- *
* @param createTableSql
*/
- private final void createTable(String createTableSql){
+ private final void createTable(String createTableSql) {
ORMUtil.executeSQL(url, userName, password, createTableSql, null);
}
@@ -214,14 +205,15 @@ protected boolean batchInsert(List messages) {
return false;
}
- private String genInsertSql(List messages){
+ private String genInsertSql(List messages) {
String sql = iSqlTemplate.createSql(convertJsonObjectFromMessage(messages));
return sql;
}
- protected List convertJsonObjectFromMessage(List messageList){
+ @Override
+ protected List convertJsonObjectFromMessage(List messageList) {
List messages = new ArrayList<>();
- for(IMessage message:messageList){
+ for (IMessage message : messageList) {
messages.add(message.getMessageBody());
}
return messages;
@@ -234,7 +226,7 @@ protected void executeSQL(String sql) {
JDBCDriver dbDataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password);
try {
dbDataSource.execute(sql);
- }finally {
+ } finally {
dbDataSource.destroy();
}
}
@@ -336,21 +328,21 @@ public void setJdbcDriver(String jdbcDriver) {
this.jdbcDriver = jdbcDriver;
}
- public void rename(String suffix){
+ public void rename(String suffix) {
SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
String rename1 = String.format("rename table %s to %s", tableName, tmpTableName.replace("tmp_", "re_") + "_" + suffix + "_" + format.format(new Date()));
String rename2 = String.format("rename table %s to %s", tmpTableName, tableName);
- logger.info(String.format("exec rename1 %s", rename1));
- logger.info(String.format("exec rename2 %s", rename2));
+ LOGGER.info(String.format("exec rename1 %s", rename1));
+ LOGGER.info(String.format("exec rename2 %s", rename2));
ORMUtil.executeSQL(rename1, null);
ORMUtil.executeSQL(rename2, null);
}
@Override
- public void atomicSink(ISystemMessage iMessage){
- if(isAtomic){
- ChangeTableNameMessage message = (ChangeTableNameMessage)iMessage;
+ public void atomicSink(ISystemMessage iMessage) {
+ if (isAtomic) {
+ ChangeTableNameMessage message = (ChangeTableNameMessage) iMessage;
rename(message.getScheduleCycle());
try {
super.finish();
@@ -359,12 +351,4 @@ public void atomicSink(ISystemMessage iMessage){
}
}
}
-
- public static boolean isAtomicConfiguration(){
- String isAtomicDBSink = ComponentCreator.getProperties().getProperty(ConfigureFileKey.IS_ATOMIC_DB_SINK);
- if(isAtomicDBSink == null){
- return false;
- }
- return Boolean.parseBoolean(isAtomicDBSink);
- }
}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java
index 709fb77b..33bfa16f 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java
@@ -17,22 +17,15 @@
package org.apache.rocketmq.streams.db.sink;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
-import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
-import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.functions.MultiTableSplitFunction;
-import org.apache.rocketmq.streams.common.utils.Base64Utils;
-import org.apache.rocketmq.streams.common.utils.InstantiationUtil;
-public class SelfMultiTableSink extends AbstractMultiTableSink implements IAfterConfigurableRefreshListener {
- protected String multiTableSplitFunctionSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码
+public class SelfMultiTableSink extends AbstractMultiTableSink {
protected transient MultiTableSplitFunction multiTableSplitFunction;
public SelfMultiTableSink(String url, String userName, String password, MultiTableSplitFunction multiTableSplitFunction) {
super(url, userName, password);
this.multiTableSplitFunction = multiTableSplitFunction;
- byte[] bytes = InstantiationUtil.serializeObject(multiTableSplitFunction);
- multiTableSplitFunctionSerializeValue = Base64Utils.encode(bytes);
}
@Override
@@ -45,9 +38,4 @@ protected String createTableName(String splitId) {
return multiTableSplitFunction.createSplit(message);
}
- @Override
- public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) {
- byte[] bytes = Base64Utils.decode(multiTableSplitFunctionSerializeValue);
- this.multiTableSplitFunction = InstantiationUtil.deserializeObject(bytes);
- }
}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/ISqlTemplate.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/ISqlTemplate.java
index 9d2007ae..f02c4ed6 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/ISqlTemplate.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/ISqlTemplate.java
@@ -28,7 +28,7 @@ public interface ISqlTemplate {
public static final String SQL_MODE_DUPLICATE = "duplicate";
public static final String SQL_MODE_IGNORE = "ignore";
- static final String[] SUPPORTS = new String[]{
+ static final String[] SUPPORTS = new String[] {
ISqlTemplate.SQL_MODE_DEFAULT,
SQL_MODE_DUPLICATE,
SQL_MODE_IGNORE
@@ -37,10 +37,11 @@ public interface ISqlTemplate {
/**
* create sql prefix
* eg :
- * insert into table(`a`,`b`,`c`)
- * or insert ignore into table(`a`, `b`, `c`)
- * or insert into table(`a`,`b`,`c`) on duplicate key update
+ * insert into table(`a`,`b`,`c`)
+ * or insert ignore into table(`a`, `b`, `c`)
+ * or insert into table(`a`,`b`,`c`) on duplicate key update
* `a` = values(`a`), `b` = values(`b`), `c` = values(`c`)
+ *
* @return
*/
void initSqlTemplate();
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIgnoreIntoSqlTemplate.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIgnoreIntoSqlTemplate.java
index cbd50a16..04f8c90d 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIgnoreIntoSqlTemplate.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIgnoreIntoSqlTemplate.java
@@ -22,7 +22,7 @@
/**
* @description
*/
-public class MysqlInsertIgnoreIntoSqlTemplate extends MysqlInsertIntoSqlTemplate{
+public class MysqlInsertIgnoreIntoSqlTemplate extends MysqlInsertIntoSqlTemplate {
public MysqlInsertIgnoreIntoSqlTemplate(MetaData metaData, boolean isContainsId) {
super(metaData, isContainsId);
@@ -30,7 +30,7 @@ public MysqlInsertIgnoreIntoSqlTemplate(MetaData metaData, boolean isContainsId)
@Override
public void initSqlTemplate() {
- if(sqlPrefix != null){
+ if (sqlPrefix != null) {
return;
}
sqlPrefix = SQLUtil.createInsertIgnoreSegment(metaData, isContainsId);
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoWithDuplicateKeySqlTemplate.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoWithDuplicateKeySqlTemplate.java
index c0150afa..d0bee49c 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoWithDuplicateKeySqlTemplate.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoWithDuplicateKeySqlTemplate.java
@@ -21,13 +21,8 @@
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.utils.SQLUtil;
-/**
- * @author zengyu.cw
- * @program apache-rocketmq-streams-fork
- * @create 2021-09-28 13:49:40
- * @description createDuplicateKeyUpdateSegment
- */
-public class MysqlInsertIntoWithDuplicateKeySqlTemplate extends MysqlInsertIntoSqlTemplate{
+
+public class MysqlInsertIntoWithDuplicateKeySqlTemplate extends MysqlInsertIntoSqlTemplate {
String sqlSuffix;
@@ -36,17 +31,17 @@ public MysqlInsertIntoWithDuplicateKeySqlTemplate(MetaData metaData, boolean isC
}
@Override
- public void initSqlTemplate(){
- if(sqlPrefix == null){
+ public void initSqlTemplate() {
+ if (sqlPrefix == null) {
sqlPrefix = SQLUtil.createInsertSegment(metaData, isContainsId);
}
- if(sqlSuffix == null){
+ if (sqlSuffix == null) {
sqlSuffix = SQLUtil.createDuplicateKeyUpdateSegment(metaData, isContainsId);
}
}
@Override
- public String createSql(List extends Map> rows){
+ public String createSql(List extends Map> rows) {
initSqlTemplate();
return String.join(" ", sqlPrefix, SQLUtil.createValuesSegment(metaData, rows, isContainsId), sqlSuffix);
}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/SqlTemplateFactory.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/SqlTemplateFactory.java
index 8837c9c6..00e86f68 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/SqlTemplateFactory.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/SqlTemplateFactory.java
@@ -26,13 +26,13 @@ public class SqlTemplateFactory {
public static ISqlTemplate newSqlTemplate(String type, MetaData metaData, boolean isContainsId) throws Exception {
- if(ISqlTemplate.SQL_MODE_DEFAULT.equalsIgnoreCase(type)){
+ if (ISqlTemplate.SQL_MODE_DEFAULT.equalsIgnoreCase(type)) {
return new MysqlInsertIntoSqlTemplate(metaData, isContainsId);
- }else if(ISqlTemplate.SQL_MODE_DUPLICATE.equalsIgnoreCase(type)){
+ } else if (ISqlTemplate.SQL_MODE_DUPLICATE.equalsIgnoreCase(type)) {
return new MysqlInsertIntoWithDuplicateKeySqlTemplate(metaData, isContainsId);
- }else if(ISqlTemplate.SQL_MODE_IGNORE.equalsIgnoreCase(type)){
+ } else if (ISqlTemplate.SQL_MODE_IGNORE.equalsIgnoreCase(type)) {
return new MysqlInsertIgnoreIntoSqlTemplate(metaData, isContainsId);
- }else{
+ } else {
throw new Exception(String.format("unsupported type %s, only support %s. ", type, Arrays.toString(ISqlTemplate.SUPPORTS)));
}
}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/source/AbstractDynamicSplitSource.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/source/AbstractDynamicSplitSource.java
new file mode 100644
index 00000000..fb916949
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/source/AbstractDynamicSplitSource.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.source;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.streams.common.channel.split.CommonSplit;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.connectors.source.AbstractPullSource;
+
+;
+
+public abstract class AbstractDynamicSplitSource extends AbstractPullSource {
+ protected int splitCount = 1;
+
+ @Override public List> fetchAllSplits() {
+ List> splits = new ArrayList<>();
+ for (int i = 0; i < 1; i++) {
+ splits.add(new CommonSplit(i + ""));
+ }
+ return splits;
+ }
+
+ public int getSplitCount() {
+ return splitCount;
+ }
+
+ public void setSplitCount(int splitCount) {
+ this.splitCount = splitCount;
+ }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/source/CycleDynamicMultipleDBScanSource.java
similarity index 87%
rename from rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java
rename to rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/source/CycleDynamicMultipleDBScanSource.java
index 561b48f2..20003462 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/source/CycleDynamicMultipleDBScanSource.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.streams.connectors.source;
+package org.apache.rocketmq.streams.db.source;
import com.alibaba.fastjson.JSONObject;
import java.io.Serializable;
@@ -24,8 +24,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.ChangeTableNameMessage;
@@ -40,6 +38,8 @@
import org.apache.rocketmq.streams.connectors.source.filter.CycleSchedule;
import org.apache.rocketmq.streams.connectors.source.filter.CycleScheduleFilter;
import org.apache.rocketmq.streams.db.CycleSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @description
@@ -47,7 +47,7 @@
public class CycleDynamicMultipleDBScanSource extends DynamicMultipleDBScanSource implements IBoundedSource, Serializable {
private static final long serialVersionUID = 6840988298037061128L;
- private static final Log logger = LogFactory.getLog(CycleDynamicMultipleDBScanSource.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(CycleDynamicMultipleDBScanSource.class);
Map initReaderMap = new ConcurrentHashMap<>();
CycleSchedule.Cycle cycle;
@@ -62,6 +62,12 @@ public CycleDynamicMultipleDBScanSource(CycleSchedule.Cycle cycle) {
this.cycle = cycle;
}
+ public static String createKey(ISource iSource) {
+ AbstractSource source = (AbstractSource) iSource;
+ CycleSchedule.Cycle cycle = ((CycleDynamicMultipleDBScanSource) iSource).getCycle();
+ return MapKeyUtil.createKey(source.getNameSpace(), source.getGroupName(), source.getName(), source.getTopic(), cycle.getCycleDateStr());
+ }
+
public AtomicInteger getSize() {
return size;
}
@@ -75,7 +81,7 @@ public void setSize(AtomicInteger size) {
*/
//todo
@Override
- public synchronized List fetchAllSplits() {
+ public synchronized List> fetchAllSplits() {
if (this.filter == null) {
filter = new CycleScheduleFilter(cycle.getAllPattern());
@@ -88,13 +94,13 @@ public synchronized List fetchAllSplits() {
String sourceName = createKey(this);
List tableNames = MetaDataUtils.listTableNameByPattern(url, userName, password, logicTableName + "%");
- logger.info(String.format("load all logic table : %s", Arrays.toString(tableNames.toArray())));
+ LOGGER.info(String.format("load all logic table : %s", Arrays.toString(tableNames.toArray())));
Iterator it = tableNames.iterator();
while (it.hasNext()) {
String s = it.next();
String suffix = s.replace(logicTableName + "_", "");
if (filter.filter(sourceName, logicTableName, suffix)) {
- logger.info(String.format("filter add %s", s));
+ LOGGER.info(String.format("filter add %s", s));
CycleSplit split = new CycleSplit();
split.setLogicTableName(logicTableName);
split.setSuffix(suffix);
@@ -106,7 +112,7 @@ public synchronized List fetchAllSplits() {
size.incrementAndGet();
}
} else {
- logger.info(String.format("filter remove %s", s));
+ LOGGER.info(String.format("filter remove %s", s));
it.remove();
}
}
@@ -129,8 +135,8 @@ public void finish() {
for (Map.Entry entry : initReaderMap.entrySet()) {
String key = entry.getKey();
Boolean value = entry.getValue();
- if (value == false) {
- logger.error(String.format("split[%s] reader is not finish, exit with error. ", key));
+ if (!value) {
+ LOGGER.error(String.format("split[%s] reader is not finish, exit with error. ", key));
}
}
this.initReaderMap.clear();
@@ -142,9 +148,6 @@ public void finish() {
@Override
public boolean isFinished() {
List readerStatuses = ReaderStatus.queryReaderStatusListBySourceName(createKey(this));
- if (readerStatuses == null) {
- return false;
- }
return readerStatuses.size() == size.get();
}
@@ -154,20 +157,20 @@ protected ISplitReader createSplitReader(ISplit iSplit) {
}
private void sendChangeTableNameMessage() {
- logger.info(String.format("start send change table name message."));
+ LOGGER.info(String.format("start send change table name message."));
ChangeTableNameMessage changeTableNameMessage = new ChangeTableNameMessage();
changeTableNameMessage.setScheduleCycle(cycle.getCycleDateStr());
Message message = createMessage(new JSONObject(), null, null, false);
message.setSystemMessage(changeTableNameMessage);
message.getHeader().setSystemMessage(true);
executeMessage(message);
- logger.info(String.format("finish send change table name message."));
+ LOGGER.info(String.format("finish send change table name message."));
}
@Override
public synchronized void boundedFinishedCallBack(ISplit iSplit) {
this.initReaderMap.put(iSplit.getQueueId(), true);
- logger.info(String.format("current map is %s, key is %s. ", initReaderMap, iSplit.getQueueId()));
+ LOGGER.info(String.format("current map is %s, key is %s. ", initReaderMap, iSplit.getQueueId()));
if (statusCheckerStart.compareAndSet(false, true)) {
Thread thread = new Thread(new Runnable() {
@Override
@@ -175,7 +178,7 @@ public void run() {
while (!isFinished()) {
ThreadUtil.sleep(3 * 1000);
}
- logger.info(String.format("source will be closed."));
+ LOGGER.info(String.format("source will be closed."));
sendChangeTableNameMessage(); //下发修改name的消息
ThreadUtil.sleep(1 * 1000);
finish();
@@ -204,10 +207,4 @@ public synchronized int getTotalReader() {
return size.get();
}
- public static String createKey(ISource iSource) {
- AbstractSource source = (AbstractSource) iSource;
- CycleSchedule.Cycle cycle = ((CycleDynamicMultipleDBScanSource) iSource).getCycle();
- return MapKeyUtil.createKey(source.getNameSpace(), source.getGroupName(), source.getConfigureName(), source.getTopic(), cycle.getCycleDateStr());
- }
-
}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/source/DBScanReader.java
similarity index 76%
rename from rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java
rename to rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/source/DBScanReader.java
index 268e891e..5558b9df 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/source/DBScanReader.java
@@ -14,56 +14,70 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.streams.connectors.reader;
+package org.apache.rocketmq.streams.db.source;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
-import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.configuration.ConfigurationKey;
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.utils.ThreadUtil;
import org.apache.rocketmq.streams.connectors.IBoundedSource;
import org.apache.rocketmq.streams.connectors.IBoundedSourceReader;
import org.apache.rocketmq.streams.connectors.model.PullMessage;
import org.apache.rocketmq.streams.connectors.model.ReaderStatus;
-import org.apache.rocketmq.streams.connectors.source.CycleDynamicMultipleDBScanSource;
+import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
+import org.apache.rocketmq.streams.connectors.reader.SplitCloseFuture;
import org.apache.rocketmq.streams.db.driver.DriverBuilder;
import org.apache.rocketmq.streams.db.driver.JDBCDriver;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @description
*/
public class DBScanReader implements ISplitReader, IBoundedSourceReader, Serializable {
- private static final long serialVersionUID = 8172403250050893288L;
- private static final Log logger = LogFactory.getLog(DBScanReader.class);
static final String sqlTemplate = "select * from %s where id >= %d and id < %d";
-
+ private static final long serialVersionUID = 8172403250050893288L;
+ private static final Logger LOGGER = LoggerFactory.getLogger(DBScanReader.class);
//是否完成了source的call back调用
- transient volatile boolean isFinishedCall = false;
- ISource iSource;
- String url;
- String userName;
- String password;
- String tableName;
- int batchSize;
- long offset;
- long offsetStart;
- long offsetEnd;
- long maxOffset;
- long minOffset;
- ISplit iSplit;
- transient List pullMessages;
- volatile boolean interrupt = false;
- volatile boolean isClosed = false;
+ private boolean isFinishedCall = false;
+ private ISource> iSource;
+ private String url;
+ private String userName;
+ private String password;
+ private final ThreadLocal threadLocal = new ThreadLocal<>() {
+
+ @Override
+ public JDBCDriver initialValue() {
+ LOGGER.info(String.format("%s initial jdbcDriver. ", Thread.currentThread().getName()));
+ return DriverBuilder.createDriver(ConfigurationKey.DEFAULT_JDBC_DRIVER, url, userName, password);
+ }
+
+ };
+ private String tableName;
+ private int batchSize;
+ private long offset;
+ private long offsetStart;
+ private long offsetEnd;
+ private long maxOffset;
+ private long minOffset;
+ private ISplit, ?> iSplit;
+ private List> pullMessages;
+ private boolean interrupt = false;
+ private boolean isClosed = false;
+
+ public DBScanReader() {
+
+ }
public String getUrl() {
return url;
@@ -105,38 +119,23 @@ public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
- public ISplit getISplit() {
+ public ISplit, ?> getISplit() {
return iSplit;
}
- public void setISplit(ISplit iSplit) {
+ public void setISplit(ISplit, ?> iSplit) {
this.iSplit = iSplit;
}
- public DBScanReader() {
-
- }
-
- transient ThreadLocal threadLocal = new ThreadLocal() {
-
- @Override
- public JDBCDriver initialValue() {
- logger.info(String.format("%s initial jdbcDriver. ", Thread.currentThread().getName()));
- return DriverBuilder.createDriver(AbstractComponent.DEFAULT_JDBC_DRIVER, url, userName, password);
- }
-
- };
-
@Override
- public void open(ISplit split) {
- this.iSplit = split;
+ public void open() {
JDBCDriver jdbcDriver = threadLocal.get();
Map range = jdbcDriver.queryOneRow("select min(id) as min_id, max(id) as max_id from " + tableName);
minOffset = Long.parseLong(String.valueOf(range.get("min_id")));
maxOffset = Long.parseLong(String.valueOf(range.get("max_id")));
offsetStart = minOffset;
offset = minOffset;
- logger.info(String.format("table %s min id [ %d ], max id [ %d ]", tableName, minOffset, maxOffset));
+ LOGGER.info(String.format("table %s min id [ %d ], max id [ %d ]", tableName, minOffset, maxOffset));
pullMessages = new ArrayList<>();
}
@@ -153,15 +152,15 @@ public boolean next() {
JDBCDriver jdbcDriver = threadLocal.get();
offsetEnd = offsetStart + batchSize;
String batchQuery = String.format(sqlTemplate, tableName, offsetStart, offsetEnd);
- logger.debug(String.format("execute sql : %s", batchQuery));
+ LOGGER.debug(String.format("execute sql : %s", batchQuery));
List