Skip to content

Commit 906bf3d

Browse files
committed
add milvus writer plugin
1 parent 3614c26 commit 906bf3d

File tree

10 files changed

+576
-0
lines changed

10 files changed

+576
-0
lines changed

milvuswriter/pom.xml

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>com.alibaba.datax</groupId>
8+
<artifactId>datax-all</artifactId>
9+
<version>0.0.1-SNAPSHOT</version>
10+
</parent>
11+
12+
<artifactId>milvuswriter</artifactId>
13+
14+
<properties>
15+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
16+
<kotlin.code.style>official</kotlin.code.style>
17+
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
18+
</properties>
19+
<dependencyManagement>
20+
<dependencies>
21+
<dependency>
22+
<artifactId>guava</artifactId>
23+
<groupId>com.google.guava</groupId>
24+
<version>32.0.1-jre</version>
25+
</dependency>
26+
</dependencies>
27+
</dependencyManagement>
28+
29+
<dependencies>
30+
<dependency>
31+
<groupId>io.milvus</groupId>
32+
<artifactId>milvus-sdk-java</artifactId>
33+
<version>2.4.8</version>
34+
</dependency>
35+
<dependency>
36+
<groupId>org.jetbrains.kotlin</groupId>
37+
<artifactId>kotlin-test-junit5</artifactId>
38+
<version>2.0.0</version>
39+
<scope>test</scope>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.junit.jupiter</groupId>
43+
<artifactId>junit-jupiter</artifactId>
44+
<version>5.10.0</version>
45+
<scope>test</scope>
46+
</dependency>
47+
<dependency>
48+
<groupId>org.jetbrains.kotlin</groupId>
49+
<artifactId>kotlin-stdlib</artifactId>
50+
<version>2.0.0</version>
51+
</dependency>
52+
<dependency>
53+
<groupId>com.alibaba.datax</groupId>
54+
<artifactId>datax-common</artifactId>
55+
<version>0.0.1-SNAPSHOT</version>
56+
<scope>compile</scope>
57+
</dependency>
58+
<dependency>
59+
<groupId>org.projectlombok</groupId>
60+
<artifactId>lombok</artifactId>
61+
<version>1.18.30</version>
62+
<scope>provided</scope>
63+
</dependency>
64+
</dependencies>
65+
66+
<build>
67+
<resources>
68+
<!--将resource目录也输出到target-->
69+
<resource>
70+
<directory>src/main/resources</directory>
71+
<includes>
72+
<include>**/*.*</include>
73+
</includes>
74+
<filtering>true</filtering>
75+
</resource>
76+
</resources>
77+
<plugins>
78+
<!-- compiler plugin -->
79+
<plugin>
80+
<artifactId>maven-compiler-plugin</artifactId>
81+
<configuration>
82+
<source>${jdk-version}</source>
83+
<target>${jdk-version}</target>
84+
<encoding>${project-sourceEncoding}</encoding>
85+
</configuration>
86+
</plugin>
87+
<!-- assembly plugin -->
88+
<plugin>
89+
<artifactId>maven-assembly-plugin</artifactId>
90+
<configuration>
91+
<descriptors>
92+
<descriptor>src/main/assembly/package.xml</descriptor>
93+
</descriptors>
94+
<finalName>datax</finalName>
95+
</configuration>
96+
<executions>
97+
<execution>
98+
<id>dwzip</id>
99+
<phase>package</phase>
100+
<goals>
101+
<goal>single</goal>
102+
</goals>
103+
</execution>
104+
</executions>
105+
</plugin>
106+
</plugins>
107+
</build>
108+
109+
</project>
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<assembly
2+
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
5+
<id></id>
6+
7+
<formats>
8+
<format>dir</format>
9+
</formats>
10+
<includeBaseDirectory>false</includeBaseDirectory>
11+
<fileSets>
12+
<fileSet>
13+
<directory>src/main/resources</directory>
14+
<includes>
15+
<include>plugin.json</include>
16+
<include>plugin_job_template.json</include>
17+
</includes>
18+
<outputDirectory>plugin/writer/milvuswriter</outputDirectory>
19+
</fileSet>
20+
<fileSet>
21+
<directory>target/</directory>
22+
<includes>
23+
<include>milvuswriter-0.0.1-SNAPSHOT.jar</include>
24+
</includes>
25+
<outputDirectory>plugin/writer/milvuswriter</outputDirectory>
26+
</fileSet>
27+
</fileSets>
28+
29+
<dependencySets>
30+
<dependencySet>
31+
<useProjectArtifact>false</useProjectArtifact>
32+
<outputDirectory>plugin/writer/milvuswriter/libs</outputDirectory>
33+
<scope>runtime</scope>
34+
</dependencySet>
35+
</dependencySets>
36+
</assembly>
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
/*
18+
* Licensed to the Apache Software Foundation (ASF) under one or more
19+
* contributor license agreements. See the NOTICE file distributed with
20+
* this work for additional information regarding copyright ownership.
21+
* The ASF licenses this file to You under the Apache License, Version 2.0
22+
* (the "License"); you may not use this file except in compliance with
23+
* the License. You may obtain a copy of the License at
24+
*
25+
* http://www.apache.org/licenses/LICENSE-2.0
26+
*
27+
* Unless required by applicable law or agreed to in writing, software
28+
* distributed under the License is distributed on an "AS IS" BASIS,
29+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
30+
* See the License for the specific language governing permissions and
31+
* limitations under the License.
32+
*/
33+
34+
package com.alibaba.datax.plugin.writer.milvuswriter;
35+
36+
import java.nio.Buffer;
37+
import java.nio.ByteBuffer;
38+
39+
public class BufferUtils {
40+
41+
public static ByteBuffer toByteBuffer(Short[] shortArray) {
42+
ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2);
43+
44+
for (Short value : shortArray) {
45+
byteBuffer.putShort(value);
46+
}
47+
48+
// Compatible compilation and running versions are not consistent
49+
// Flip the buffer to prepare for reading
50+
((Buffer) byteBuffer).flip();
51+
52+
return byteBuffer;
53+
}
54+
55+
public static Short[] toShortArray(ByteBuffer byteBuffer) {
56+
Short[] shortArray = new Short[byteBuffer.capacity() / 2];
57+
58+
for (int i = 0; i < shortArray.length; i++) {
59+
shortArray[i] = byteBuffer.getShort();
60+
}
61+
62+
return shortArray;
63+
}
64+
65+
public static ByteBuffer toByteBuffer(Float[] floatArray) {
66+
ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4);
67+
68+
for (float value : floatArray) {
69+
byteBuffer.putFloat(value);
70+
}
71+
72+
((Buffer) byteBuffer).flip();
73+
74+
return byteBuffer;
75+
}
76+
77+
public static Float[] toFloatArray(ByteBuffer byteBuffer) {
78+
Float[] floatArray = new Float[byteBuffer.capacity() / 4];
79+
80+
for (int i = 0; i < floatArray.length; i++) {
81+
floatArray[i] = byteBuffer.getFloat();
82+
}
83+
84+
return floatArray;
85+
}
86+
87+
public static ByteBuffer toByteBuffer(Double[] doubleArray) {
88+
ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8);
89+
90+
for (double value : doubleArray) {
91+
byteBuffer.putDouble(value);
92+
}
93+
94+
((Buffer) byteBuffer).flip();
95+
96+
return byteBuffer;
97+
}
98+
99+
public static Double[] toDoubleArray(ByteBuffer byteBuffer) {
100+
Double[] doubleArray = new Double[byteBuffer.capacity() / 8];
101+
102+
for (int i = 0; i < doubleArray.length; i++) {
103+
doubleArray[i] = byteBuffer.getDouble();
104+
}
105+
106+
return doubleArray;
107+
}
108+
109+
public static ByteBuffer toByteBuffer(Integer[] intArray) {
110+
ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4);
111+
112+
for (int value : intArray) {
113+
byteBuffer.putInt(value);
114+
}
115+
116+
((Buffer) byteBuffer).flip();
117+
118+
return byteBuffer;
119+
}
120+
121+
public static Integer[] toIntArray(ByteBuffer byteBuffer) {
122+
Integer[] intArray = new Integer[byteBuffer.capacity() / 4];
123+
124+
for (int i = 0; i < intArray.length; i++) {
125+
intArray[i] = byteBuffer.getInt();
126+
}
127+
128+
return intArray;
129+
}
130+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.alibaba.datax.plugin.writer.milvuswriter;
2+
3+
public class KeyConstant {
4+
public static final String URI = "uri";
5+
public static final String TOKEN = "token";
6+
public static final String DATABASE = "database";
7+
public static final String COLLECTION = "collection";
8+
public static final String AUTO_ID = "autoId";
9+
public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema";
10+
public static final String BATCH_SIZE = "batchSize";
11+
public static final String COLUMN = "column";
12+
public static final String COLUMN_TYPE = "type";
13+
public static final String COLUMN_NAME = "name";
14+
public static final String VECTOR_DIMENSION = "dimension";
15+
public static final String IS_PRIMARY_KEY = "isPrimaryKey";
16+
// "schemaCreateMode":"createWhenTableNotExit"/"Ignore"/"exception"
17+
public static final String schemaCreateMode = "schemaCreateMode";
18+
public static final String IS_PARTITION_KEY = "isPartitionKey";
19+
public static final String MAX_LENGTH = "maxLength";
20+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.alibaba.datax.plugin.writer.milvuswriter;
2+
3+
import com.google.gson.JsonObject;
4+
import io.milvus.v2.client.MilvusClientV2;
5+
import io.milvus.v2.service.vector.request.UpsertReq;
6+
import lombok.extern.slf4j.Slf4j;
7+
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
11+
@Slf4j
12+
public class MilvusBufferWriter {
13+
14+
private final MilvusClientV2 milvusClientV2;
15+
private final String collection;
16+
private final Integer batchSize;
17+
private List<JsonObject> dataCache;
18+
19+
public MilvusBufferWriter(MilvusClientV2 milvusClientV2, String collection, Integer batchSize){
20+
this.milvusClientV2 = milvusClientV2;
21+
this.collection = collection;
22+
this.batchSize = batchSize;
23+
this.dataCache = new ArrayList<>();
24+
}
25+
public void write(JsonObject data){
26+
dataCache.add(data);
27+
}
28+
public Boolean needCommit(){
29+
return dataCache.size() >= batchSize;
30+
}
31+
public void commit(){
32+
if(dataCache.isEmpty()){
33+
log.info("dataCache is empty, skip commit");
34+
return;
35+
}
36+
UpsertReq upsertReq = UpsertReq.builder()
37+
.collectionName(collection)
38+
.data(dataCache)
39+
.build();
40+
milvusClientV2.upsert(upsertReq);
41+
dataCache = new ArrayList<>();
42+
}
43+
}

0 commit comments

Comments
 (0)