Skip to content

Commit cd05b51

Browse files
committed
fix tets
Signed-off-by: SpadeA <[email protected]>
1 parent efb3175 commit cd05b51

6 files changed

Lines changed: 170 additions & 35 deletions

File tree

internal/core/src/index/IndexFactory.cpp

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,42 @@ IndexFactory::VecIndexLoadResource(
234234
num_rows,
235235
dim,
236236
config);
237-
has_raw_data =
238-
knowhere::IndexStaticFaced<knowhere::fp32>::HasRawData(
239-
index_type, index_version, config);
237+
break;
238+
case milvus::DataType::VECTOR_FLOAT16:
239+
resource = knowhere::IndexStaticFaced<knowhere::fp16>::
240+
EstimateLoadResource(index_type,
241+
index_version,
242+
index_size_in_bytes,
243+
num_rows,
244+
dim,
245+
config);
246+
break;
247+
case milvus::DataType::VECTOR_BFLOAT16:
248+
resource = knowhere::IndexStaticFaced<knowhere::bf16>::
249+
EstimateLoadResource(index_type,
250+
index_version,
251+
index_size_in_bytes,
252+
num_rows,
253+
dim,
254+
config);
255+
break;
256+
case milvus::DataType::VECTOR_BINARY:
257+
resource = knowhere::IndexStaticFaced<knowhere::bin1>::
258+
EstimateLoadResource(index_type,
259+
index_version,
260+
index_size_in_bytes,
261+
num_rows,
262+
dim,
263+
config);
264+
break;
265+
case milvus::DataType::VECTOR_INT8:
266+
resource = knowhere::IndexStaticFaced<knowhere::int8>::
267+
EstimateLoadResource(index_type,
268+
index_version,
269+
index_size_in_bytes,
270+
num_rows,
271+
dim,
272+
config);
240273
break;
241274

242275
default:
@@ -247,6 +280,9 @@ IndexFactory::VecIndexLoadResource(
247280
element_type);
248281
return LoadResourceRequest{0, 0, 0, 0, true};
249282
}
283+
// For VectorArray, has_raw_data is always false as get_vector of index does not provide offsets which
284+
// is required for reconstructing the raw data
285+
has_raw_data = false;
250286
break;
251287
}
252288
default:

internal/core/src/mmap/ChunkedColumn.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ class ChunkedColumnBase : public ChunkedColumnInterface {
282282
"VectorArrayViews only supported for ChunkedVectorArrayColumn");
283283
}
284284

285-
virtual PinWrapper<const size_t*>
285+
PinWrapper<const size_t*>
286286
VectorArrayOffsets(milvus::OpContext* op_ctx,
287287
int64_t chunk_id) const override {
288288
ThrowInfo(

internal/util/importutilv2/parquet/field_reader.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1845,34 +1845,49 @@ func ReadVectorArrayData(pcr *FieldReader, count int64) (any, error) {
18451845
if chunk.NullN() > 0 {
18461846
return nil, WrapNullRowErr(pcr.field)
18471847
}
1848+
// ArrayOfVector is stored as list of fixed size binary
18481849
listReader, ok := chunk.(*array.List)
18491850
if !ok {
18501851
return nil, WrapTypeErr(pcr.field, chunk.DataType().Name())
18511852
}
1852-
listFloat32Reader, ok := listReader.ListValues().(*array.Float32)
1853+
1854+
fixedBinaryReader, ok := listReader.ListValues().(*array.FixedSizeBinary)
18531855
if !ok {
1854-
return nil, WrapTypeErr(pcr.field, chunk.DataType().Name())
1856+
return nil, WrapTypeErr(pcr.field, listReader.ListValues().DataType().Name())
1857+
}
1858+
1859+
// Check that each vector has the correct byte size (dim * 4 bytes for float32)
1860+
expectedByteSize := int(dim) * 4
1861+
actualByteSize := fixedBinaryReader.DataType().(*arrow.FixedSizeBinaryType).ByteWidth
1862+
if actualByteSize != expectedByteSize {
1863+
return nil, merr.WrapErrImportFailed(fmt.Sprintf("vector byte size mismatch: expected %d, got %d for field '%s'",
1864+
expectedByteSize, actualByteSize, pcr.field.GetName()))
18551865
}
1866+
18561867
offsets := listReader.Offsets()
18571868
for i := 1; i < len(offsets); i++ {
18581869
start, end := offsets[i-1], offsets[i]
1859-
floatCount := end - start
1860-
if floatCount%int32(dim) != 0 {
1861-
return nil, merr.WrapErrImportFailed(fmt.Sprintf("vectors in VectorArray should be aligned with dim: %d", dim))
1862-
}
1870+
vectorCount := end - start
18631871

1864-
arrLength := floatCount / int32(dim)
1865-
if err = common.CheckArrayCapacity(int(arrLength), maxCapacity, pcr.field); err != nil {
1872+
if err = common.CheckArrayCapacity(int(vectorCount), maxCapacity, pcr.field); err != nil {
18661873
return nil, err
18671874
}
18681875

1869-
arrData := make([]float32, floatCount)
1870-
copy(arrData, listFloat32Reader.Float32Values()[start:end])
1876+
// Convert binary data to float32 array using arrow's built-in conversion
1877+
totalFloats := vectorCount * int32(dim)
1878+
floatData := make([]float32, totalFloats)
1879+
for j := int32(0); j < vectorCount; j++ {
1880+
vectorIndex := start + j
1881+
binaryData := fixedBinaryReader.Value(int(vectorIndex))
1882+
vectorFloats := arrow.Float32Traits.CastFromBytes(binaryData)
1883+
copy(floatData[j*int32(dim):(j+1)*int32(dim)], vectorFloats)
1884+
}
1885+
18711886
data = append(data, &schemapb.VectorField{
18721887
Dim: dim,
18731888
Data: &schemapb.VectorField_FloatVector{
18741889
FloatVector: &schemapb.FloatArray{
1875-
Data: arrData,
1890+
Data: floatData,
18761891
},
18771892
},
18781893
})

internal/util/importutilv2/parquet/util.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ func isArrowDataTypeConvertible(src arrow.DataType, dst arrow.DataType, field *s
195195
return valid
196196
}
197197
return false
198+
case arrow.FIXED_SIZE_BINARY:
199+
return dstType == arrow.FIXED_SIZE_BINARY
198200
default:
199201
return false
200202
}

internal/util/testutil/test_util.go

Lines changed: 102 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package testutil
22

33
import (
4+
"encoding/binary"
45
"fmt"
56
"math"
67
"math/rand"
@@ -734,30 +735,113 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser
734735
columns = append(columns, builder.NewListArray())
735736
}
736737
case schemapb.DataType_ArrayOfVector:
737-
data := insertData.Data[fieldID].(*storage.VectorArrayFieldData).Data
738-
rows := len(data)
738+
vectorArrayData := insertData.Data[fieldID].(*storage.VectorArrayFieldData)
739+
dim, err := typeutil.GetDim(field)
740+
if err != nil {
741+
return nil, err
742+
}
743+
elemType, err := storage.VectorArrayToArrowType(elementType, int(dim))
744+
if err != nil {
745+
return nil, err
746+
}
739747

740-
switch elementType {
741-
case schemapb.DataType_FloatVector:
742-
// ArrayOfVector is flattened in Arrow - just a list of floats
743-
// where total floats = dim * num_vectors
744-
builder := array.NewListBuilder(mem, &arrow.Float32Type{})
745-
valueBuilder := builder.ValueBuilder().(*array.Float32Builder)
748+
// Create ListBuilder with "item" field name to match convertToArrowDataType
749+
// Always represented as a list of fixed-size binary values
750+
listBuilder := array.NewListBuilderWithField(mem, arrow.Field{
751+
Name: "item",
752+
Type: elemType,
753+
Nullable: true,
754+
Metadata: arrow.Metadata{},
755+
})
756+
fixedSizeBuilder, ok := listBuilder.ValueBuilder().(*array.FixedSizeBinaryBuilder)
757+
if !ok {
758+
return nil, fmt.Errorf("unexpected list value builder for VectorArray field %s: %T", field.GetName(), listBuilder.ValueBuilder())
759+
}
746760

747-
for i := 0; i < rows; i++ {
748-
vectorArray := data[i].GetFloatVector()
749-
if vectorArray == nil || len(vectorArray.GetData()) == 0 {
750-
builder.AppendNull()
761+
vectorArrayData.Dim = dim
762+
763+
bytesPerVector := int(fixedSizeBuilder.Type().(*arrow.FixedSizeBinaryType).ByteWidth)
764+
765+
appendBinarySlice := func(data []byte, stride int) error {
766+
if stride == 0 {
767+
return fmt.Errorf("zero stride for VectorArray field %s", field.GetName())
768+
}
769+
if len(data)%stride != 0 {
770+
return fmt.Errorf("vector array data length %d is not divisible by stride %d for field %s", len(data), stride, field.GetName())
771+
}
772+
for offset := 0; offset < len(data); offset += stride {
773+
fixedSizeBuilder.Append(data[offset : offset+stride])
774+
}
775+
return nil
776+
}
777+
778+
for _, vectorField := range vectorArrayData.Data {
779+
if vectorField == nil {
780+
listBuilder.Append(false)
781+
continue
782+
}
783+
784+
listBuilder.Append(true)
785+
786+
switch elementType {
787+
case schemapb.DataType_FloatVector:
788+
floatArray := vectorField.GetFloatVector()
789+
if floatArray == nil {
790+
return nil, fmt.Errorf("expected FloatVector data for field %s", field.GetName())
791+
}
792+
data := floatArray.GetData()
793+
if len(data) == 0 {
794+
continue
795+
}
796+
if len(data)%int(dim) != 0 {
797+
return nil, fmt.Errorf("float vector data length %d is not divisible by dim %d for field %s", len(data), dim, field.GetName())
798+
}
799+
for offset := 0; offset < len(data); offset += int(dim) {
800+
vectorBytes := make([]byte, bytesPerVector)
801+
for j := 0; j < int(dim); j++ {
802+
binary.LittleEndian.PutUint32(vectorBytes[j*4:], math.Float32bits(data[offset+j]))
803+
}
804+
fixedSizeBuilder.Append(vectorBytes)
805+
}
806+
case schemapb.DataType_BinaryVector:
807+
binaryData := vectorField.GetBinaryVector()
808+
if len(binaryData) == 0 {
809+
continue
810+
}
811+
bytesPer := int((dim + 7) / 8)
812+
if err := appendBinarySlice(binaryData, bytesPer); err != nil {
813+
return nil, err
814+
}
815+
case schemapb.DataType_Float16Vector:
816+
float16Data := vectorField.GetFloat16Vector()
817+
if len(float16Data) == 0 {
818+
continue
819+
}
820+
if err := appendBinarySlice(float16Data, int(dim)*2); err != nil {
821+
return nil, err
822+
}
823+
case schemapb.DataType_BFloat16Vector:
824+
bfloat16Data := vectorField.GetBfloat16Vector()
825+
if len(bfloat16Data) == 0 {
751826
continue
752827
}
753-
builder.Append(true)
754-
// Append all flattened vector data
755-
valueBuilder.AppendValues(vectorArray.GetData(), nil)
828+
if err := appendBinarySlice(bfloat16Data, int(dim)*2); err != nil {
829+
return nil, err
830+
}
831+
case schemapb.DataType_Int8Vector:
832+
int8Data := vectorField.GetInt8Vector()
833+
if len(int8Data) == 0 {
834+
continue
835+
}
836+
if err := appendBinarySlice(int8Data, int(dim)); err != nil {
837+
return nil, err
838+
}
839+
default:
840+
return nil, fmt.Errorf("unsupported element type in VectorArray: %s", elementType.String())
756841
}
757-
columns = append(columns, builder.NewListArray())
758-
default:
759-
return nil, fmt.Errorf("unsupported element type in VectorArray: %s", elementType.String())
760842
}
843+
844+
columns = append(columns, listBuilder.NewListArray())
761845
}
762846
}
763847
return columns, nil

tests/integration/util_index.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,6 @@ func (s *MiniClusterSuite) waitForIndexBuiltInternal(ctx context.Context, dbName
7070
return true
7171
}
7272
for _, desc := range resp.GetIndexDescriptions() {
73-
fmt.Println("debug=== Index description", desc.String())
74-
7573
if desc.GetFieldName() == field {
7674
switch desc.GetState() {
7775
case commonpb.IndexState_Finished:

0 commit comments

Comments
 (0)