Skip to content
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
75d0a10
impl VectorArray as an internal representation of proto::schema::Vect…
SpadeA-Tang May 28, 2025
1852c0d
parse schema from proto
SpadeA-Tang May 28, 2025
0fcbbc0
can read from sealed segment and growing segment
SpadeA-Tang May 29, 2025
f5ae9a5
update test
SpadeA-Tang May 30, 2025
56cd780
resolve conflict
SpadeA-Tang May 30, 2025
efcb89d
add vector array for storage
SpadeA-Tang May 30, 2025
2b2b296
add tests
SpadeA-Tang May 30, 2025
df24a8b
update code for proto name change
SpadeA-Tang Jun 3, 2025
be261df
update for proto change
SpadeA-Tang Jun 3, 2025
ee7b259
add struct array fields in colelction
SpadeA-Tang Jun 3, 2025
e953909
add test
SpadeA-Tang Jun 3, 2025
25a7d6e
add tests
SpadeA-Tang Jun 3, 2025
b1a30b8
adjust for proto change
SpadeA-Tang Jun 4, 2025
15e1899
udpate for proto change
SpadeA-Tang Jun 4, 2025
8460594
update proto
SpadeA-Tang Jun 4, 2025
1d11cfa
fix test
SpadeA-Tang Jun 4, 2025
b66aaed
update comment; update proto
SpadeA-Tang Jun 4, 2025
2b52262
update proto and fix
SpadeA-Tang Jun 4, 2025
842d1c0
fix build error
SpadeA-Tang Jun 4, 2025
e6626d9
address comments
SpadeA-Tang Jun 4, 2025
770bed1
fix test
SpadeA-Tang Jun 4, 2025
41e6a71
fix test
SpadeA-Tang Jun 4, 2025
da5864f
fix test
SpadeA-Tang Jun 4, 2025
b211f3b
Merge branch 'emb-list-create-schema' into emb-list-insert-data
SpadeA-Tang Jun 4, 2025
0d91e50
Merge branch 'emb-list-array-vector-go' into emb-list-insert-data
SpadeA-Tang Jun 4, 2025
8354843
Merge branch 'emb-list-array-vector-segcore' into emb-list-insert-data
SpadeA-Tang Jun 4, 2025
60992fd
check and flatten struct data in pre execute
SpadeA-Tang Jun 4, 2025
c757a4c
ban nullable
SpadeA-Tang Jun 4, 2025
9897539
Merge branch 'emb-list-create-schema' into emb-list-insert-data
SpadeA-Tang Jun 4, 2025
580f00c
update schema helper
SpadeA-Tang Jun 4, 2025
167b83e
check and flatten struct array in pre-execute
SpadeA-Tang Jun 6, 2025
84b6dba
retrieve data:
SpadeA-Tang Jun 9, 2025
67ff93e
Merge branch 'master' into emb-list-insert-data
SpadeA-Tang Jun 9, 2025
839bff6
field id
SpadeA-Tang Jun 10, 2025
87d66ae
Merge branch 'master' into emb-list-create-schema
SpadeA-Tang Jun 11, 2025
29a75b9
rebase
SpadeA-Tang Jun 11, 2025
9102566
fix tests
SpadeA-Tang Jun 12, 2025
60b05d4
fix fmt
SpadeA-Tang Jun 12, 2025
2d9107c
Merge branch 'master' into emb-list-array-vector-go
SpadeA-Tang Jun 12, 2025
6c136f1
fix fielddata nil
SpadeA-Tang Jun 13, 2025
cbcb6b7
rebase
SpadeA-Tang Jun 16, 2025
3220abe
fix compile
SpadeA-Tang Jun 16, 2025
ac67bf9
merge storage impl in go
SpadeA-Tang Jun 19, 2025
550f5fa
Merge branch 'emb-list-create-schema' into emb-list-insert-data
SpadeA-Tang Jun 19, 2025
d19c90b
rebase
SpadeA-Tang Jun 19, 2025
7b43afa
rebase
SpadeA-Tang Jun 19, 2025
5e4e51d
add mix compaction test
SpadeA-Tang Jun 19, 2025
7526086
fix dynamic field flatten
SpadeA-Tang Jun 19, 2025
944299c
rebase
SpadeA-Tang Jun 19, 2025
a94ec0c
[skip e2e]rm redundant comment
SpadeA-Tang Jun 19, 2025
daf28ed
fix build error
SpadeA-Tang Jun 20, 2025
d316f20
polish code and add tests
SpadeA-Tang Jun 20, 2025
eae6d48
add comments
SpadeA-Tang Jun 23, 2025
dc0e1fd
rebase
SpadeA-Tang Jun 23, 2025
ad9920f
remove redundant test
SpadeA-Tang Jun 23, 2025
fdb9375
rebase
SpadeA-Tang Jun 23, 2025
e6c1e8d
fix test
SpadeA-Tang Jun 24, 2025
cbc5614
add checks
SpadeA-Tang Jun 24, 2025
5f61f54
address comment
SpadeA-Tang Jun 25, 2025
f83fb1e
rebase
SpadeA-Tang Jun 25, 2025
7743a98
address comments
SpadeA-Tang Jul 1, 2025
ad1f616
trigger ci
SpadeA-Tang Jul 1, 2025
aff06a6
Merge branch 'master' into emb-list-insert-data
SpadeA-Tang Jul 1, 2025
c7021cc
fix test compile
SpadeA-Tang Jul 1, 2025
335a968
rebase
SpadeA-Tang Jul 17, 2025
496e726
rebase
SpadeA-Tang Jul 21, 2025
abed064
fix test
SpadeA-Tang Jul 21, 2025
14528e7
rebase
SpadeA-Tang Jul 25, 2025
5a1c284
rebase
SpadeA-Tang Jul 25, 2025
ac56fed
rebase
SpadeA-Tang Jul 25, 2025
e111562
refine ValueDeserializer
SpadeA-Tang Jul 25, 2025
cade279
rebase
SpadeA-Tang Jul 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/tools/migration/meta/meta220.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (meta *TtCollectionsMeta220) GenerateSaves(sourceVersion semver.Version) (m
if sourceVersion.LT(versions.Version220) {
opts = append(opts, model.WithFields())
opts = append(opts, model.WithPartitions())
opts = append(opts, model.WithStructArrayFields())
}

for collectionID := range *meta {
Expand Down Expand Up @@ -98,6 +99,7 @@ func (meta *CollectionsMeta220) GenerateSaves(sourceVersion semver.Version) (map
if sourceVersion.LT(versions.Version220) {
opts = append(opts, model.WithFields())
opts = append(opts, model.WithPartitions())
opts = append(opts, model.WithStructArrayFields())
}

for collectionID := range *meta {
Expand Down
3 changes: 2 additions & 1 deletion internal/core/unittest/test_utils/DataGen.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ GenerateRandomSparseFloatVector(size_t rows,
return tensor;
}

inline SchemaPtr CreateTestSchema() {
inline SchemaPtr
CreateTestSchema() {
auto schema = std::make_shared<milvus::Schema>();
auto bool_field =
schema->AddDebugField("bool", milvus::DataType::BOOL, true);
Expand Down
4 changes: 0 additions & 4 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,10 +786,6 @@ func isFlush(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing
}

func needSync(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing || segment.GetState() == commonpb.SegmentState_Sealed
}

// buckets will be updated inplace
func (t *compactionTrigger) squeezeSmallSegmentsToBuckets(small []*SegmentInfo, buckets [][]*SegmentInfo, expectedSize int64) (remaining []*SegmentInfo) {
for i := len(small) - 1; i >= 0; i-- {
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
flushSegmentIDs := make([]UniqueID, 0, len(segments))
for _, segment := range segments {
if segment != nil &&
(isFlushState(segment.GetState())) &&
isFlushState(segment.GetState()) &&
segment.GetLevel() != datapb.SegmentLevel_L0 && // SegmentLevel_Legacy, SegmentLevel_L1, SegmentLevel_L2
!sealedSegmentsIDDict[segment.GetID()] {
flushSegmentIDs = append(flushSegmentIDs, segment.GetID())
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/compactor/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func (t *clusteringCompactionTask) mappingSegment(
}

reader := storage.NewDeserializeReader(rr, func(r storage.Record, v []*storage.Value) error {
return storage.ValueDeserializer(r, v, t.plan.Schema.Fields)
return storage.ValueDeserializer(r, v, t.plan.Schema)
})
defer reader.Close()

Expand Down Expand Up @@ -884,7 +884,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
}

pkIter := storage.NewDeserializeReader(rr, func(r storage.Record, v []*storage.Value) error {
return storage.ValueDeserializer(r, v, t.plan.Schema.Fields)
return storage.ValueDeserializer(r, v, t.plan.Schema)
})
defer pkIter.Close()
analyzeResult, remained, err := t.iterAndGetScalarAnalyzeResult(pkIter, expiredFilter)
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/compactor/segment_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, batchSize

func newBinlogWriter(collID, partID, segID int64, schema *schemapb.CollectionSchema, batchSize int,
) (writer *storage.BinlogSerializeWriter, closers []func() (*storage.Blob, error), err error) {
fieldWriters := storage.NewBinlogStreamWriters(collID, partID, segID, schema.Fields)
fieldWriters := storage.NewBinlogStreamWriters(collID, partID, segID, schema)
closers = make([]func() (*storage.Blob, error), 0, len(fieldWriters))
for _, w := range fieldWriters {
closers = append(closers, w.Finalize)
Expand Down
6 changes: 6 additions & 0 deletions internal/datanode/importv2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,12 @@
fields := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID()
})
for _, structField := range schema.GetStructArrayFields() {
for _, subField := range structField.GetFields() {
fields[subField.GetFieldID()] = subField
}

Check warning on line 441 in internal/datanode/importv2/util.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/importv2/util.go#L439-L441

Added lines #L439 - L441 were not covered by tests
}

for fieldID, fd := range data.Data {
if fd.RowNum() == 0 && CanBeZeroRowField(fields[fieldID]) {
continue
Expand Down
4 changes: 2 additions & 2 deletions internal/flushcommon/syncmgr/pack_writer_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ func (bw *BulkPackWriterV2) serializeBinlog(ctx context.Context, pack *SyncPack)
if len(pack.insertData) == 0 {
return nil, nil
}
arrowSchema, err := storage.ConvertToArrowSchema(bw.schema.Fields)
arrowSchema, err := storage.ConvertToArrowSchema(bw.schema)
if err != nil {
return nil, err
}
builder := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema)
defer builder.Release()

for _, chunk := range pack.insertData {
if err := storage.BuildRecord(builder, chunk, bw.schema.GetFields()); err != nil {
if err := storage.BuildRecord(builder, chunk, bw.schema); err != nil {
return nil, err
}
}
Expand Down
60 changes: 59 additions & 1 deletion internal/metastore/kv/rootcoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@
return fmt.Sprintf("%s/%d", BuildFunctionPrefix(collectionID), functionID)
}

func BuildStructArrayFieldPrefix(collectionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d", StructArrayFieldMetaPrefix, collectionID)
}

func BuildStructArrayFieldKey(collectionId typeutil.UniqueID, fieldId int64) string {
return fmt.Sprintf("%s/%d", BuildStructArrayFieldPrefix(collectionId), fieldId)
}

func BuildAliasKey210(alias string) string {
return fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix210, alias)
}
Expand Down Expand Up @@ -209,6 +217,17 @@
kvs[k] = string(v)
}

// save struct array fields to new path
for _, structArrayField := range coll.StructArrayFields {
k := BuildStructArrayFieldKey(coll.CollectionID, structArrayField.FieldID)
structArrayFieldInfo := model.MarshalStructArrayFieldModel(structArrayField)
v, err := proto.Marshal(structArrayFieldInfo)
if err != nil {
return err
}

Check warning on line 227 in internal/metastore/kv/rootcoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/rootcoord/kv_catalog.go#L226-L227

Added lines #L226 - L227 were not covered by tests
kvs[k] = string(v)
}

// save functions info to new path.
for _, function := range coll.Functions {
k := BuildFunctionKey(coll.CollectionID, function.ID)
Expand Down Expand Up @@ -389,7 +408,7 @@
}

func fieldVersionAfter210(collMeta *pb.CollectionInfo) bool {
return len(collMeta.GetSchema().GetFields()) <= 0
return len(collMeta.GetSchema().GetFields()) <= 0 && len(collMeta.GetSchema().GetStructArrayFields()) <= 0
}

func (kc *Catalog) listFieldsAfter210(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) ([]*model.Field, error) {
Expand Down Expand Up @@ -436,6 +455,24 @@
return ret, nil
}

func (kc *Catalog) listStructArrayFieldsAfter210(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) ([]*model.StructArrayField, error) {
prefix := BuildStructArrayFieldPrefix(collectionID)
_, values, err := kc.Snapshot.LoadWithPrefix(ctx, prefix, ts)
if err != nil {
return nil, err
}

Check warning on line 463 in internal/metastore/kv/rootcoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/rootcoord/kv_catalog.go#L462-L463

Added lines #L462 - L463 were not covered by tests
structFields := make([]*model.StructArrayField, 0, len(values))
for _, v := range values {
partitionMeta := &schemapb.StructArrayFieldSchema{}
err := proto.Unmarshal([]byte(v), partitionMeta)
if err != nil {
return nil, err
}
structFields = append(structFields, model.UnmarshalStructArrayFieldModel(partitionMeta))

Check warning on line 471 in internal/metastore/kv/rootcoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/rootcoord/kv_catalog.go#L466-L471

Added lines #L466 - L471 were not covered by tests
}
return structFields, nil
}

func (kc *Catalog) listFunctions(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) ([]*model.Function, error) {
prefix := BuildFunctionPrefix(collectionID)
_, values, err := kc.Snapshot.LoadWithPrefix(ctx, prefix, ts)
Expand Down Expand Up @@ -499,6 +536,12 @@
}
collection.Fields = fields

structArrayFields, err := kc.listStructArrayFieldsAfter210(ctx, collection.CollectionID, ts)
if err != nil {
return nil, err
}

Check warning on line 542 in internal/metastore/kv/rootcoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/rootcoord/kv_catalog.go#L541-L542

Added lines #L541 - L542 were not covered by tests
collection.StructArrayFields = structArrayFields

functions, err := kc.listFunctions(ctx, collection.CollectionID, ts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -610,6 +653,9 @@
for _, field := range collectionInfo.Fields {
delMetakeysSnap = append(delMetakeysSnap, BuildFieldKey(collectionInfo.CollectionID, field.FieldID))
}
for _, structArrayField := range collectionInfo.StructArrayFields {
delMetakeysSnap = append(delMetakeysSnap, BuildStructArrayFieldKey(collectionInfo.CollectionID, structArrayField.FieldID))
}
for _, function := range collectionInfo.Functions {
delMetakeysSnap = append(delMetakeysSnap, BuildFunctionKey(collectionInfo.CollectionID, function.ID))
}
Expand Down Expand Up @@ -650,6 +696,7 @@
oldCollClone.State = newColl.State
oldCollClone.Properties = newColl.Properties
oldCollClone.Fields = newColl.Fields
oldCollClone.StructArrayFields = newColl.StructArrayFields
oldCollClone.UpdateTimestamp = newColl.UpdateTimestamp

newKey := BuildCollectionKey(newColl.DBID, oldColl.CollectionID)
Expand All @@ -670,7 +717,18 @@
}
saves[k] = string(v)
}

for _, structArrayField := range newColl.StructArrayFields {
k := BuildStructArrayFieldKey(newColl.CollectionID, structArrayField.FieldID)
structArrayFieldInfo := model.MarshalStructArrayFieldModel(structArrayField)
v, err := proto.Marshal(structArrayFieldInfo)
if err != nil {
return err
}
saves[k] = string(v)

Check warning on line 728 in internal/metastore/kv/rootcoord/kv_catalog.go

View check run for this annotation

Codecov / codecov/patch

internal/metastore/kv/rootcoord/kv_catalog.go#L722-L728

Added lines #L722 - L728 were not covered by tests
}
}

return etcd.SaveByBatchWithLimit(saves, util.MaxEtcdTxnNum/2, func(partialKvs map[string]string) error {
return kc.Snapshot.MultiSave(ctx, partialKvs, ts)
})
Expand Down
48 changes: 47 additions & 1 deletion internal/metastore/kv/rootcoord/kv_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ func TestCatalog_ListCollections(t *testing.T) {
}), ts).
Return([]string{"rootcoord/functions/1/1"}, []string{string(fcm)}, nil)

kv.On("LoadWithPrefix", mock.Anything, mock.MatchedBy(
func(prefix string) bool {
return strings.HasPrefix(prefix, StructArrayFieldMetaPrefix)
}), ts).
Return([]string{}, []string{}, nil)

kc := NewCatalog(nil, kv)
ret, err := kc.ListCollections(ctx, testDb, ts)
assert.NoError(t, err)
Expand Down Expand Up @@ -264,6 +270,12 @@ func TestCatalog_ListCollections(t *testing.T) {
}), ts).
Return([]string{"rootcoord/functions/1/1"}, []string{string(fcm)}, nil)

kv.On("LoadWithPrefix", mock.Anything, mock.MatchedBy(
func(prefix string) bool {
return strings.HasPrefix(prefix, StructArrayFieldMetaPrefix)
}), ts).
Return([]string{}, []string{}, nil)

kv.On("MultiSaveAndRemove", mock.Anything, mock.Anything, mock.Anything, ts).Return(nil)
kc := NewCatalog(nil, kv)

Expand Down Expand Up @@ -1287,7 +1299,7 @@ func TestCatalog_CreateCollection(t *testing.T) {
assert.NoError(t, err)
})

t.Run("create collection with function", func(t *testing.T) {
t.Run("create collection with function and struct array field", func(t *testing.T) {
mockSnapshot := newMockSnapshot(t, withMockSave(nil), withMockMultiSave(nil))
kc := NewCatalog(nil, mockSnapshot)
ctx := context.Background()
Expand All @@ -1311,6 +1323,23 @@ func TestCatalog_CreateCollection(t *testing.T) {
DataType: schemapb.DataType_SparseFloatVector,
},
},
StructArrayFields: []*model.StructArrayField{
{
Name: "test_struct",
Fields: []*model.Field{
{
Name: "sub_text",
DataType: schemapb.DataType_Array,
ElementType: schemapb.DataType_VarChar,
},
{
Name: "sub_sparse",
DataType: schemapb.DataType_ArrayOfVector,
ElementType: schemapb.DataType_SparseFloatVector,
},
},
},
},
Functions: []*model.Function{
{
Name: "test",
Expand Down Expand Up @@ -1417,6 +1446,23 @@ func TestCatalog_DropCollection(t *testing.T) {
DataType: schemapb.DataType_SparseFloatVector,
},
},
StructArrayFields: []*model.StructArrayField{
{
Name: "test_struct",
Fields: []*model.Field{
{
Name: "sub_text",
DataType: schemapb.DataType_Array,
ElementType: schemapb.DataType_VarChar,
},
{
Name: "sub_sparse",
DataType: schemapb.DataType_ArrayOfVector,
ElementType: schemapb.DataType_SparseFloatVector,
},
},
},
},
Functions: []*model.Function{
{
Name: "test",
Expand Down
9 changes: 5 additions & 4 deletions internal/metastore/kv/rootcoord/rootcoord_constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ const (
// CollectionMetaPrefix prefix for collection meta
CollectionMetaPrefix = ComponentPrefix + "/collection"

PartitionMetaPrefix = ComponentPrefix + "/partitions"
AliasMetaPrefix = ComponentPrefix + "/aliases"
FieldMetaPrefix = ComponentPrefix + "/fields"
FunctionMetaPrefix = ComponentPrefix + "/functions"
PartitionMetaPrefix = ComponentPrefix + "/partitions"
AliasMetaPrefix = ComponentPrefix + "/aliases"
FieldMetaPrefix = ComponentPrefix + "/fields"
StructArrayFieldMetaPrefix = ComponentPrefix + "/struct-array-fields"
FunctionMetaPrefix = ComponentPrefix + "/functions"

// CollectionAliasMetaPrefix210 prefix for collection alias meta
CollectionAliasMetaPrefix210 = ComponentPrefix + "/collection-alias"
Expand Down
Loading
Loading