Skip to content

Conversation

@nianliuu
Copy link
Contributor

Overview

Add MilvusReader and MilvusWriter to support read and write data for milvus

Code Change

Currently, DataX provider no support for vector data, in this PR, we introduced milvusread and writer plugin, which support moving data between milvus
we support the following vector type for now:

  • float vector
  • binary vector

Test

tested with data-x example to migrate data from one milvus to another

Limitation

  • not support moving dynamic schema data
  • not support read and write by partition

@nianliuu nianliuu force-pushed the master branch 3 times, most recently from d36fe0b to ef56ae7 Compare November 28, 2024 05:15
@Override
public void destroy() {
log.info("Closing Milvus writer, committing data and closing connection");
this.milvusBufferWriter.commit();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.milvusBufferWriter.commit()这个感觉放到startWrite while后,判断下不为空做一次好些。便于理解和维护

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

参考
image

.collectionName(collection)
.data(dataCache)
.build();
milvusClientV2.upsert(upsertReq);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是否要考虑下失败重试的情况,重试次数和间隔时间设置

.uri(writerSliceConfig.getString(KeyConstant.URI))
.token(writerSliceConfig.getString(KeyConstant.TOKEN))
.build();
if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里判断条件我理解是不是!=null


@Getter
public enum SchemaCreateMode {
CREATE_WHEN_NOT_EXIST(0),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议还是用字符串来表示,而不是int,这样任务配置可读性会高很多
● createWhenTableNotExit
● Ignore
● recreate

}
UpsertReq upsertReq = UpsertReq.builder()
.collectionName(collection)
.data(dataCache)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里看着没有写入对应配置的partition,直接写到了默认partition。 如果配置了partition应该要写到对应partition下

package com.alibaba.datax.plugin.writer.milvuswriter;

public class KeyConstant {
public static final String URI = "uri";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uri改成endpoint吧,datax里基本都用endpoint做参数配置

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants