Skip to content

Commit f479264

Browse files
committed
add milvus reader plugin
1 parent 906bf3d commit f479264

File tree

18 files changed

+874
-32
lines changed

18 files changed

+874
-32
lines changed

milvusreader/pom.xml

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>com.alibaba.datax</groupId>
8+
<artifactId>datax-all</artifactId>
9+
<version>0.0.1-SNAPSHOT</version>
10+
</parent>
11+
12+
<artifactId>milvusreader</artifactId>
13+
14+
<properties>
15+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
16+
<kotlin.code.style>official</kotlin.code.style>
17+
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
18+
</properties>
19+
<dependencyManagement>
20+
<dependencies>
21+
<dependency>
22+
<artifactId>guava</artifactId>
23+
<groupId>com.google.guava</groupId>
24+
<version>32.0.1-jre</version>
25+
</dependency>
26+
</dependencies>
27+
</dependencyManagement>
28+
29+
<dependencies>
30+
<dependency>
31+
<groupId>io.milvus</groupId>
32+
<artifactId>milvus-sdk-java</artifactId>
33+
<version>2.4.8</version>
34+
</dependency>
35+
<dependency>
36+
<groupId>org.jetbrains.kotlin</groupId>
37+
<artifactId>kotlin-test-junit5</artifactId>
38+
<version>2.0.0</version>
39+
<scope>test</scope>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.junit.jupiter</groupId>
43+
<artifactId>junit-jupiter</artifactId>
44+
<version>5.10.0</version>
45+
<scope>test</scope>
46+
</dependency>
47+
<dependency>
48+
<groupId>org.jetbrains.kotlin</groupId>
49+
<artifactId>kotlin-stdlib</artifactId>
50+
<version>2.0.0</version>
51+
</dependency>
52+
<dependency>
53+
<groupId>com.alibaba.datax</groupId>
54+
<artifactId>datax-common</artifactId>
55+
<version>0.0.1-SNAPSHOT</version>
56+
<scope>compile</scope>
57+
</dependency>
58+
<dependency>
59+
<groupId>org.projectlombok</groupId>
60+
<artifactId>lombok</artifactId>
61+
<version>1.18.30</version>
62+
<scope>provided</scope>
63+
</dependency>
64+
</dependencies>
65+
66+
<build>
67+
<resources>
68+
<!--将resource目录也输出到target-->
69+
<resource>
70+
<directory>src/main/resources</directory>
71+
<includes>
72+
<include>**/*.*</include>
73+
</includes>
74+
<filtering>true</filtering>
75+
</resource>
76+
</resources>
77+
<plugins>
78+
<!-- compiler plugin -->
79+
<plugin>
80+
<artifactId>maven-compiler-plugin</artifactId>
81+
<configuration>
82+
<source>${jdk-version}</source>
83+
<target>${jdk-version}</target>
84+
<encoding>${project-sourceEncoding}</encoding>
85+
</configuration>
86+
</plugin>
87+
<!-- assembly plugin -->
88+
<plugin>
89+
<artifactId>maven-assembly-plugin</artifactId>
90+
<configuration>
91+
<descriptors>
92+
<descriptor>src/main/assembly/package.xml</descriptor>
93+
</descriptors>
94+
<finalName>datax</finalName>
95+
</configuration>
96+
<executions>
97+
<execution>
98+
<id>dwzip</id>
99+
<phase>package</phase>
100+
<goals>
101+
<goal>single</goal>
102+
</goals>
103+
</execution>
104+
</executions>
105+
</plugin>
106+
</plugins>
107+
</build>
108+
109+
</project>
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<assembly
2+
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
5+
<id></id>
6+
7+
<formats>
8+
<format>dir</format>
9+
</formats>
10+
<includeBaseDirectory>false</includeBaseDirectory>
11+
<fileSets>
12+
<fileSet>
13+
<directory>src/main/resources</directory>
14+
<includes>
15+
<include>plugin.json</include>
16+
<include>plugin_job_template.json</include>
17+
</includes>
18+
<outputDirectory>plugin/reader/milvusreader</outputDirectory>
19+
</fileSet>
20+
<fileSet>
21+
<directory>target/</directory>
22+
<includes>
23+
<include>milvuswriter-0.0.1-SNAPSHOT.jar</include>
24+
</includes>
25+
<outputDirectory>plugin/reader/milvusreader</outputDirectory>
26+
</fileSet>
27+
</fileSets>
28+
29+
<dependencySets>
30+
<dependencySet>
31+
<useProjectArtifact>false</useProjectArtifact>
32+
<outputDirectory>plugin/reader/milvusreader/libs</outputDirectory>
33+
<scope>runtime</scope>
34+
</dependencySet>
35+
</dependencySets>
36+
</assembly>
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
### Datax MilvusReader
2+
#### 1 快速介绍
3+
4+
MilvusReader 插件利用 Milvus 的java客户端MilvusClient进行Milvus的读操作。
5+
6+
#### 2 实现原理
7+
8+
MilvusReader通过Datax框架从Milvus读取数据,通过主控的JOB程序按照指定的规则对Milvus中的数据进行分片,并行读取,然后将Milvus支持的类型通过逐一判断转换成Datax支持的类型。
9+
10+
#### 3 功能说明
11+
* 该示例从Milvus读一份Collection数据到另一个Milvus。
12+
```json
13+
{
14+
"job": {
15+
"content": [
16+
{
17+
"reader": {
18+
"name": "milvusreader",
19+
"parameter": {
20+
"uri": "https://****.aws-us-west-2.vectordb.zillizcloud.com:19532",
21+
"token": "*****",
22+
"collection": "medium_articles",
23+
"batchSize": 10
24+
}
25+
},
26+
"writer": {
27+
"name": "milvuswriter",
28+
"parameter": {
29+
"uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19530",
30+
"token": "*****",
31+
"collection": "medium_articles",
32+
"batchSize": 10,
33+
"column": [
34+
{
35+
"name": "id",
36+
"type": "Int64",
37+
"isPrimaryKey": true
38+
},
39+
{
40+
"name": "title_vector",
41+
"type": "FloatVector",
42+
"dimension": 768
43+
},
44+
{
45+
"name": "title",
46+
"type": "VarChar",
47+
"maxLength": 1000
48+
},
49+
{
50+
"name": "link",
51+
"type": "VarChar",
52+
"maxLength": 1000
53+
},
54+
{
55+
"name": "reading_time",
56+
"type": "Int64"
57+
},
58+
{
59+
"name": "publication",
60+
"type": "VarChar",
61+
"maxLength": 1000
62+
},
63+
{
64+
"name": "claps",
65+
"type": "Int64"
66+
},
67+
{
68+
"name": "responses",
69+
"type": "Int64"
70+
}
71+
]
72+
}
73+
}
74+
}
75+
],
76+
"setting": {
77+
"speed": {
78+
"channel": 1
79+
}
80+
}
81+
}
82+
}
83+
```
84+
85+
#### 4 参数说明
86+
87+
* uri: Milvus Cluster endpoint。【必填】
88+
* token:Milvus的连接token。【必填】
89+
* collection: 读取数据的collection。【必填】
90+
* partition: 读取数据的partition。【选填】
91+
* batchSize: 每次读取数据的行数【选填】
92+
93+
#### 5 类型转换
94+
95+
| DataX 内部类型| Milvus 数据类型 |
96+
| -------- |-----------------|
97+
| Long | int |
98+
| Double | double |
99+
| String | string, varchar |
100+
| Boolean | bool |
101+
102+
- 当前暂不支持读取dynamic schema的数据,及按partition读取
103+
104+
#### 6 性能报告
105+
#### 7 测试报告
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
/*
18+
* Licensed to the Apache Software Foundation (ASF) under one or more
19+
* contributor license agreements. See the NOTICE file distributed with
20+
* this work for additional information regarding copyright ownership.
21+
* The ASF licenses this file to You under the Apache License, Version 2.0
22+
* (the "License"); you may not use this file except in compliance with
23+
* the License. You may obtain a copy of the License at
24+
*
25+
* http://www.apache.org/licenses/LICENSE-2.0
26+
*
27+
* Unless required by applicable law or agreed to in writing, software
28+
* distributed under the License is distributed on an "AS IS" BASIS,
29+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
30+
* See the License for the specific language governing permissions and
31+
* limitations under the License.
32+
*/
33+
34+
package com.alibaba.datax.plugin.reader.milvusreader;
35+
36+
import java.nio.Buffer;
37+
import java.nio.ByteBuffer;
38+
39+
public class BufferUtils {
40+
41+
public static ByteBuffer toByteBuffer(Short[] shortArray) {
42+
ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2);
43+
44+
for (Short value : shortArray) {
45+
byteBuffer.putShort(value);
46+
}
47+
48+
// Compatible compilation and running versions are not consistent
49+
// Flip the buffer to prepare for reading
50+
((Buffer) byteBuffer).flip();
51+
52+
return byteBuffer;
53+
}
54+
55+
public static Short[] toShortArray(ByteBuffer byteBuffer) {
56+
Short[] shortArray = new Short[byteBuffer.capacity() / 2];
57+
58+
for (int i = 0; i < shortArray.length; i++) {
59+
shortArray[i] = byteBuffer.getShort();
60+
}
61+
62+
return shortArray;
63+
}
64+
65+
public static ByteBuffer toByteBuffer(Float[] floatArray) {
66+
ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4);
67+
68+
for (float value : floatArray) {
69+
byteBuffer.putFloat(value);
70+
}
71+
72+
((Buffer) byteBuffer).flip();
73+
74+
return byteBuffer;
75+
}
76+
77+
public static Float[] toFloatArray(ByteBuffer byteBuffer) {
78+
Float[] floatArray = new Float[byteBuffer.capacity() / 4];
79+
80+
for (int i = 0; i < floatArray.length; i++) {
81+
floatArray[i] = byteBuffer.getFloat();
82+
}
83+
84+
return floatArray;
85+
}
86+
87+
public static ByteBuffer toByteBuffer(Double[] doubleArray) {
88+
ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8);
89+
90+
for (double value : doubleArray) {
91+
byteBuffer.putDouble(value);
92+
}
93+
94+
((Buffer) byteBuffer).flip();
95+
96+
return byteBuffer;
97+
}
98+
99+
public static Double[] toDoubleArray(ByteBuffer byteBuffer) {
100+
Double[] doubleArray = new Double[byteBuffer.capacity() / 8];
101+
102+
for (int i = 0; i < doubleArray.length; i++) {
103+
doubleArray[i] = byteBuffer.getDouble();
104+
}
105+
106+
return doubleArray;
107+
}
108+
109+
public static ByteBuffer toByteBuffer(Integer[] intArray) {
110+
ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4);
111+
112+
for (int value : intArray) {
113+
byteBuffer.putInt(value);
114+
}
115+
116+
((Buffer) byteBuffer).flip();
117+
118+
return byteBuffer;
119+
}
120+
121+
public static Integer[] toIntArray(ByteBuffer byteBuffer) {
122+
Integer[] intArray = new Integer[byteBuffer.capacity() / 4];
123+
124+
for (int i = 0; i < intArray.length; i++) {
125+
intArray[i] = byteBuffer.getInt();
126+
}
127+
128+
return intArray;
129+
}
130+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.alibaba.datax.plugin.reader.milvusreader;
2+
3+
public class KeyConstant {
4+
public static final String URI = "uri";
5+
public static final String TOKEN = "token";
6+
public static final String DATABASE = "database";
7+
public static final String COLLECTION = "collection";
8+
public static final String PARTITION = "partition";
9+
public static final String BATCH_SIZE = "batchSize";
10+
}

0 commit comments

Comments
 (0)