Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/blang/semver/v4 v4.0.0
github.com/cockroachdb/errors v1.9.1
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1
github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.4-0.20251124031301-fbb0e90ed771
github.com/milvus-io/milvus/pkg/v2 v2.6.3
github.com/quasilyte/go-ruleguard/dsl v0.3.23
github.com/samber/lo v1.27.0
github.com/stretchr/testify v1.11.1
Expand Down Expand Up @@ -46,7 +46,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
Expand Down Expand Up @@ -119,3 +119,5 @@ require (
k8s.io/apimachinery v0.32.3 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace github.com/milvus-io/milvus/pkg/v2 => ../pkg
10 changes: 4 additions & 6 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12 h1:9Nu54bhS/H/Kgo2/7xNSUuC5G28VR8ljfrLKU2G4IjU=
github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12/go.mod h1:TBzl5BIHNXfS9+C35ZyJaklL7mLDbgUkcgXzSLa8Tk0=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand Down Expand Up @@ -330,10 +330,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1 h1:nSZoftB+vB285AwYAOoJnwxKPMhP7l0p+VurCJGG9ds=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256 h1:M2waty0w2k4YT2HHzJk3fx6EFPD4DKxNJatitIV+gGU=
github.com/milvus-io/milvus/pkg/v2 v2.6.4-0.20251104142533-a2ce70d25256/go.mod h1:HT6Wxahwj/l8+i+D/C3iwDzCjDa36U9gyVw6CjjK4pE=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.4-0.20251124031301-fbb0e90ed771 h1:OKFR5ohjXN/75FdS2kt6L36F7NPk0y95y5ZZ8VHOtvI=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.4-0.20251124031301-fbb0e90ed771/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.4-0.20251124031301-fbb0e90ed771
github.com/minio/minio-go/v7 v7.0.73
github.com/panjf2000/ants/v2 v2.11.3 // indirect
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 // indirect
Expand Down Expand Up @@ -74,7 +74,7 @@ require (
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/magiconair/properties v1.8.7
github.com/milvus-io/milvus/client/v2 v2.0.0-00010101000000-000000000000
github.com/milvus-io/milvus/pkg/v2 v2.6.4
github.com/milvus-io/milvus/pkg/v2 v2.6.3
github.com/pkg/errors v0.9.1
github.com/remeh/sizedwaitgroup v1.0.0
github.com/shirou/gopsutil/v4 v4.24.10
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -798,8 +798,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L
github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1 h1:nSZoftB+vB285AwYAOoJnwxKPMhP7l0p+VurCJGG9ds=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.6-0.20251119054300-fcb3986f4af1/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.4-0.20251124031301-fbb0e90ed771 h1:OKFR5ohjXN/75FdS2kt6L36F7NPk0y95y5ZZ8VHOtvI=
github.com/milvus-io/milvus-proto/go-api/v2 v2.6.4-0.20251124031301-fbb0e90ed771/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
Expand Down
268 changes: 268 additions & 0 deletions internal/core/src/common/ArrayOffsets.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ArrayOffsets.h"
#include "segcore/SegmentInterface.h"
#include "log/Log.h"
#include "common/EasyAssert.h"

namespace milvus {

std::pair<int64_t, int64_t>
ArrayOffsetsSealed::ElementIDToRowID(int64_t elem_id) const {
assert(elem_id >= 0 && elem_id < GetTotalElementCount());

int32_t row_id = element_row_ids_[elem_id];
// Compute elem_idx: elem_idx = elem_id - start_of_this_row
int32_t elem_idx = elem_id - row_to_element_start_[row_id];
return {row_id, elem_idx};
}

std::pair<int64_t, int64_t>
ArrayOffsetsSealed::ElementIDRangeOfRow(int64_t row_id) const {
int64_t row_count = GetRowCount();
assert(row_id >= 0 && row_id <= row_count);

if (row_id == row_count) {
auto total = row_to_element_start_[row_count];
return {total, total};
}
return {row_to_element_start_[row_id], row_to_element_start_[row_id + 1]};
}

std::pair<TargetBitmap, TargetBitmap>
ArrayOffsetsSealed::RowBitsetToElementBitset(
const TargetBitmapView& row_bitset,
const TargetBitmapView& valid_row_bitset) const {
int64_t row_count = GetRowCount();
int64_t element_count = GetTotalElementCount();
TargetBitmap element_bitset(element_count);
TargetBitmap valid_element_bitset(element_count);

for (int64_t row_id = 0; row_id < row_count; ++row_id) {
int64_t start = row_to_element_start_[row_id];
int64_t end = row_to_element_start_[row_id + 1];
if (start < end) {
element_bitset.set(start, end - start, row_bitset[row_id]);
valid_element_bitset.set(
start, end - start, valid_row_bitset[row_id]);
}
}

return {std::move(element_bitset), std::move(valid_element_bitset)};
}
Comment on lines +46 to +66
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::pair<TargetBitmap, TargetBitmap>
ArrayOffsetsSealed::RowBitsetToElementBitset(
const TargetBitmapView& row_bitset,
const TargetBitmapView& valid_row_bitset) const {
int64_t row_count = GetRowCount();
int64_t element_count = GetTotalElementCount();
TargetBitmap element_bitset(element_count);
TargetBitmap valid_element_bitset(element_count);
for (int64_t row_id = 0; row_id < row_count; ++row_id) {
int64_t start = row_to_element_start_[row_id];
int64_t end = row_to_element_start_[row_id + 1];
if (start < end) {
element_bitset.set(start, end - start, row_bitset[row_id]);
valid_element_bitset.set(
start, end - start, valid_row_bitset[row_id]);
}
}
return {std::move(element_bitset), std::move(valid_element_bitset)};
}
std::pair<TargetBitmap, TargetBitmap>
ArrayOffsetsSealed::RowBitsetToElementBitset(
const TargetBitmapView& row_bitset,
const TargetBitmapView& valid_row_bitset) const {
int64_t row_count = GetRowCount();
int64_t element_count = GetTotalElementCount();
TargetBitmap element_bitset(element_count, false);
TargetBitmap valid_element_bitset(element_count, false);
if (row_count == 0) {
return {std::move(element_bitset), std::move(valid_element_bitset)};
}
auto flush = [&](TargetBitmap& bitset,
int64_t start_row,
int64_t end_row,
bool val) {
if (!val) {
return;
}
int64_t start_elem = row_to_element_start_[start_row];
int64_t end_elem = row_to_element_start_[end_row];
if (end_elem > start_elem) {
bitset.set(start_elem, end_elem - start_elem, val);
}
};
bool last_val = row_bitset[0];
int64_t last_val_idx = 0;
bool last_valid = valid_row_bitset[0];
int64_t last_valid_idx = 0;
int64_t i = 1;
while (i < row_count) {
bool val = row_bitset[i];
bool valid = valid_row_bitset[i];
if (val != last_val) {
flush(element_bitset, last_val_idx, i, last_val);
last_val = val;
last_val_idx = i;
}
if (valid != last_valid) {
flush(valid_element_bitset, last_valid_idx, i, last_valid);
last_valid = valid;
last_valid_idx = i;
}
++i;
}
flush(element_bitset, last_val_idx, row_count, last_val);
flush(valid_element_bitset, last_valid_idx, row_count, last_valid);
return {std::move(element_bitset), std::move(valid_element_bitset)};
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this optimize necessary? It's more comptex and involves more prediction. In some cases, it optimize a lot but in some cases such as 0, 1, 0, 1, 0, 1 it's negative optimize.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do a little benchmark, based on various 0/1 distributions.


ArrayOffsetsSealed
ArrayOffsetsSealed::BuildFromSegment(const void* segment,
const FieldMeta& field_meta) {
auto seg = static_cast<const segcore::SegmentInternalInterface*>(segment);

int64_t row_count = seg->get_row_count();
if (row_count == 0) {
LOG_INFO(
"ArrayOffsetsSealed::BuildFromSegment: empty segment for struct "
"'{}'",
field_meta.get_name().get());
return ArrayOffsetsSealed();
}

FieldId field_id = field_meta.get_id();
auto data_type = field_meta.get_data_type();

std::vector<int32_t> element_row_ids;
// Size is row_count + 1, last element stores total_element_count
std::vector<int32_t> row_to_element_start(row_count + 1);

auto temp_op_ctx = std::make_unique<OpContext>();
auto op_ctx_ptr = temp_op_ctx.get();

int64_t num_chunks = seg->num_chunk(field_id);
int32_t current_row_id = 0;

if (data_type == DataType::VECTOR_ARRAY) {
for (int64_t chunk_id = 0; chunk_id < num_chunks; ++chunk_id) {
auto pin_wrapper = seg->chunk_view<VectorArrayView>(
op_ctx_ptr, field_id, chunk_id);
const auto& [vector_array_views, valid_flags] = pin_wrapper.get();

for (size_t i = 0; i < vector_array_views.size(); ++i) {
int32_t array_len = 0;
if (valid_flags.empty() || valid_flags[i]) {
array_len = vector_array_views[i].length();
}

// Record the start position for this row
row_to_element_start[current_row_id] = element_row_ids.size();

// Add row_id for each element (elem_idx computed on access)
for (int32_t j = 0; j < array_len; ++j) {
element_row_ids.emplace_back(current_row_id);
}

current_row_id++;
}
}
} else {
for (int64_t chunk_id = 0; chunk_id < num_chunks; ++chunk_id) {
auto pin_wrapper =
seg->chunk_view<ArrayView>(op_ctx_ptr, field_id, chunk_id);
const auto& [array_views, valid_flags] = pin_wrapper.get();

for (size_t i = 0; i < array_views.size(); ++i) {
int32_t array_len = 0;
if (valid_flags.empty() || valid_flags[i]) {
array_len = array_views[i].length();
}

// Record the start position for this row
row_to_element_start[current_row_id] = element_row_ids.size();

// Add row_id for each element (elem_idx computed on access)
for (int32_t j = 0; j < array_len; ++j) {
element_row_ids.emplace_back(current_row_id);
}

current_row_id++;
}
}
}

// Store total element count as the last entry
row_to_element_start[row_count] = element_row_ids.size();

AssertInfo(current_row_id == row_count,
"Row count mismatch: expected {}, got {}",
row_count,
current_row_id);

int64_t total_elements = element_row_ids.size();

LOG_INFO(
"ArrayOffsetsSealed::BuildFromSegment: struct_name='{}', "
"field_id={}, row_count={}, total_elements={}",
field_meta.get_name().get(),
field_meta.get_id().get(),
row_count,
total_elements);

return ArrayOffsetsSealed(std::move(element_row_ids),
std::move(row_to_element_start));
}

std::pair<int64_t, int64_t>
ArrayOffsetsGrowing::ElementIDToRowID(int64_t elem_id) const {
std::shared_lock lock(mutex_);
assert(elem_id >= 0 &&
elem_id < static_cast<int64_t>(element_row_ids_.size()));
int32_t row_id = element_row_ids_[elem_id];
// Compute elem_idx: elem_idx = elem_id - start_of_this_row
int32_t elem_idx = elem_id - row_to_element_start_[row_id];
return {row_id, elem_idx};
}

std::pair<int64_t, int64_t>
ArrayOffsetsGrowing::ElementIDRangeOfRow(int64_t row_id) const {
std::shared_lock lock(mutex_);
assert(row_id >= 0 && row_id <= committed_row_count_);

if (row_id == committed_row_count_) {
auto total = row_to_element_start_[committed_row_count_];
return {total, total};
}
return {row_to_element_start_[row_id], row_to_element_start_[row_id + 1]};
}

std::pair<TargetBitmap, TargetBitmap>
ArrayOffsetsGrowing::RowBitsetToElementBitset(
const TargetBitmapView& row_bitset,
const TargetBitmapView& valid_row_bitset) const {
std::shared_lock lock(mutex_);

int64_t element_count = element_row_ids_.size();
TargetBitmap element_bitset(element_count);
TargetBitmap valid_element_bitset(element_count);

// Direct access to element_row_ids_, no virtual function calls
for (size_t elem_id = 0; elem_id < element_row_ids_.size(); ++elem_id) {
auto row_id = element_row_ids_[elem_id];
element_bitset[elem_id] = row_bitset[row_id];
valid_element_bitset[elem_id] = valid_row_bitset[row_id];
}

return {std::move(element_bitset), std::move(valid_element_bitset)};
}

void
ArrayOffsetsGrowing::Insert(int64_t row_id_start,
const int32_t* array_lengths,
int64_t count) {
std::unique_lock lock(mutex_);

row_to_element_start_.reserve(row_id_start + count + 1);

for (int64_t i = 0; i < count; ++i) {
int32_t row_id = row_id_start + i;
int32_t array_len = array_lengths[i];

if (row_id == committed_row_count_) {
// Record the start position for this row
row_to_element_start_.push_back(element_row_ids_.size());

// Add row_id for each element (elem_idx computed on access)
for (int32_t j = 0; j < array_len; ++j) {
element_row_ids_.emplace_back(row_id);
}

committed_row_count_++;
} else {
pending_rows_[row_id] = {row_id, array_len};
}
}

DrainPendingRows();

// Update the sentinel (total element count)
if (row_to_element_start_.size() ==
static_cast<size_t>(committed_row_count_)) {
row_to_element_start_.push_back(element_row_ids_.size());
} else {
row_to_element_start_[committed_row_count_] = element_row_ids_.size();
}
}

void
ArrayOffsetsGrowing::DrainPendingRows() {
while (true) {
auto it = pending_rows_.find(committed_row_count_);
if (it == pending_rows_.end()) {
break;
}

const auto& pending = it->second;

row_to_element_start_.push_back(element_row_ids_.size());

for (int32_t j = 0; j < pending.array_len; ++j) {
element_row_ids_.emplace_back(static_cast<int32_t>(pending.row_id));
}

committed_row_count_++;

pending_rows_.erase(it);
}
}

} // namespace milvus
Loading