Skip to content

Commit cc5ba77

Browse files
committed
segcore impl doen
Signed-off-by: SpadeA <[email protected]>
1 parent 8af7490 commit cc5ba77

53 files changed

Lines changed: 3665 additions & 335 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#include "ArrayOffsets.h"
18+
#include "segcore/SegmentInterface.h"
19+
#include "log/Log.h"
20+
#include "common/EasyAssert.h"
21+
22+
namespace milvus {
23+
24+
namespace {
25+
ArrayOffsetsSealed
26+
BuildArrayOffsetsSealedFromColumn(
27+
const segcore::SegmentInternalInterface* segment,
28+
const FieldMeta& field_meta,
29+
int64_t row_count) {
30+
FieldId field_id = field_meta.get_id();
31+
auto data_type = field_meta.get_data_type();
32+
33+
ArrayOffsetsSealed result;
34+
result.doc_count = row_count;
35+
result.doc_to_element_range_.resize(row_count);
36+
37+
auto temp_op_ctx = std::make_unique<OpContext>();
38+
auto op_ctx_ptr = temp_op_ctx.get();
39+
40+
int64_t num_chunks = segment->num_chunk(field_id);
41+
int64_t current_doc_id = 0;
42+
43+
if (data_type == DataType::VECTOR_ARRAY) {
44+
for (int64_t chunk_id = 0; chunk_id < num_chunks; ++chunk_id) {
45+
auto pin_wrapper = segment->chunk_view<VectorArrayView>(
46+
op_ctx_ptr, field_id, chunk_id);
47+
const auto& [vector_array_views, valid_flags] = pin_wrapper.get();
48+
49+
for (size_t i = 0; i < vector_array_views.size(); ++i) {
50+
int64_t array_len = 0;
51+
if (valid_flags.empty() || valid_flags[i]) {
52+
array_len = vector_array_views[i].length();
53+
}
54+
55+
// Record the start position for this doc
56+
int64_t elem_start = result.element_info.size();
57+
58+
// Add (doc_id, element_index) for each element
59+
for (int32_t j = 0; j < array_len; ++j) {
60+
result.element_info.emplace_back(current_doc_id, j);
61+
}
62+
63+
int64_t elem_end = result.element_info.size();
64+
result.doc_to_element_range_[current_doc_id] = {elem_start,
65+
elem_end};
66+
67+
current_doc_id++;
68+
}
69+
}
70+
} else {
71+
for (int64_t chunk_id = 0; chunk_id < num_chunks; ++chunk_id) {
72+
auto pin_wrapper =
73+
segment->chunk_view<ArrayView>(op_ctx_ptr, field_id, chunk_id);
74+
const auto& [array_views, valid_flags] = pin_wrapper.get();
75+
76+
for (size_t i = 0; i < array_views.size(); ++i) {
77+
int64_t array_len = 0;
78+
if (valid_flags.empty() || valid_flags[i]) {
79+
array_len = array_views[i].length();
80+
}
81+
82+
// Record the start position for this doc
83+
int64_t elem_start = result.element_info.size();
84+
85+
// Add (doc_id, element_index) for each element
86+
for (int32_t j = 0; j < array_len; ++j) {
87+
result.element_info.emplace_back(current_doc_id, j);
88+
}
89+
90+
int64_t elem_end = result.element_info.size();
91+
result.doc_to_element_range_[current_doc_id] = {elem_start,
92+
elem_end};
93+
94+
current_doc_id++;
95+
}
96+
}
97+
}
98+
99+
AssertInfo(current_doc_id == row_count,
100+
"Document count mismatch: expected {}, got {}",
101+
row_count,
102+
current_doc_id);
103+
104+
return result;
105+
}
106+
107+
} // anonymous namespace
108+
109+
std::pair<int64_t, int64_t>
110+
ArrayOffsetsSealed::ElementIDToDoc(int64_t elem_id) const {
111+
assert(elem_id >= 0 && elem_id < GetTotalElementCount());
112+
113+
const auto& [doc_id, elem_idx] = element_info[elem_id];
114+
return {doc_id, elem_idx};
115+
}
116+
117+
std::pair<int64_t, int64_t>
118+
ArrayOffsetsSealed::DocIDToElementID(int64_t doc_id) const {
119+
assert(doc_id >= 0 && doc_id <= doc_count);
120+
121+
// If doc_id equals doc_count, return the total element count
122+
// This represents "past the last document"
123+
if (doc_id >= doc_count) {
124+
return {GetTotalElementCount(), GetTotalElementCount()};
125+
}
126+
127+
return doc_to_element_range_[doc_id];
128+
}
129+
130+
std::pair<int64_t, int64_t>
131+
ArrayOffsetsGrowing::ElementIDToDoc(int64_t elem_id) const {
132+
std::shared_lock lock(mutex_);
133+
assert(elem_id >= 0 &&
134+
elem_id < static_cast<int64_t>(element_info_.size()));
135+
const auto& [doc_id, elem_idx] = element_info_[elem_id];
136+
return {doc_id, elem_idx};
137+
}
138+
139+
std::pair<int64_t, int64_t>
140+
ArrayOffsetsGrowing::DocIDToElementID(int64_t doc_id) const {
141+
std::shared_lock lock(mutex_);
142+
assert(doc_id >= 0 && doc_id <= committed_doc_count_);
143+
144+
// If doc_id equals or exceeds committed_doc_count_, return the total element count
145+
// This represents "past the last document"
146+
if (doc_id >= committed_doc_count_) {
147+
return {static_cast<int64_t>(element_info_.size()),
148+
static_cast<int64_t>(element_info_.size())};
149+
}
150+
151+
return doc_to_element_range_[doc_id];
152+
}
153+
154+
ArrayOffsetsSealed
155+
ArrayOffsetsSealed::BuildFromSegment(const void* segment,
156+
const FieldMeta& field_meta) {
157+
auto seg = static_cast<const segcore::SegmentInternalInterface*>(segment);
158+
159+
int64_t row_count = seg->get_row_count();
160+
if (row_count == 0) {
161+
LOG_INFO(
162+
"ArrayOffsetsSealed::BuildFromSegment: empty segment for struct "
163+
"'{}'",
164+
field_meta.get_name().get());
165+
ArrayOffsetsSealed result;
166+
result.doc_count = 0;
167+
return result;
168+
}
169+
170+
ArrayOffsetsSealed result =
171+
BuildArrayOffsetsSealedFromColumn(seg, field_meta, row_count);
172+
173+
int64_t total_elements = result.GetTotalElementCount();
174+
175+
LOG_INFO(
176+
"ArrayOffsetsSealed::BuildFromSegment: struct_name='{}', "
177+
"field_id={}, row_count={}, total_elements={}",
178+
field_meta.get_name().get(),
179+
field_meta.get_id().get(),
180+
row_count,
181+
total_elements);
182+
183+
return result;
184+
}
185+
186+
void
187+
ArrayOffsetsGrowing::Insert(int64_t doc_id_start,
188+
const int32_t* array_lengths,
189+
int64_t count) {
190+
std::unique_lock lock(mutex_);
191+
192+
for (int64_t i = 0; i < count; ++i) {
193+
int64_t doc_id = doc_id_start + i;
194+
int32_t array_len = array_lengths[i];
195+
196+
if (doc_id == committed_doc_count_) {
197+
// Record the start position for this doc
198+
int64_t elem_start = element_info_.size();
199+
200+
for (int32_t j = 0; j < array_len; ++j) {
201+
element_info_.emplace_back(static_cast<int32_t>(doc_id), j);
202+
}
203+
204+
// Record the end position for this doc
205+
int64_t elem_end = element_info_.size();
206+
207+
// Ensure doc_to_element_range_ is large enough
208+
if (static_cast<int64_t>(doc_to_element_range_.size()) <= doc_id) {
209+
doc_to_element_range_.resize(doc_id + 1);
210+
}
211+
doc_to_element_range_[doc_id] = {elem_start, elem_end};
212+
213+
committed_doc_count_++;
214+
215+
// Try to drain pending documents
216+
DrainPendingDocs();
217+
} else {
218+
// Cache this document for later
219+
pending_docs_[doc_id] = {doc_id, array_len};
220+
}
221+
}
222+
}
223+
224+
void
225+
ArrayOffsetsGrowing::DrainPendingDocs() {
226+
while (true) {
227+
auto it = pending_docs_.find(committed_doc_count_);
228+
if (it == pending_docs_.end()) {
229+
break;
230+
}
231+
232+
// Commit this pending document
233+
const auto& pending = it->second;
234+
235+
// Record the start position for this doc
236+
int64_t elem_start = element_info_.size();
237+
238+
for (int32_t j = 0; j < pending.array_len; ++j) {
239+
element_info_.emplace_back(static_cast<int32_t>(pending.doc_id), j);
240+
}
241+
242+
// Record the end position for this doc
243+
int64_t elem_end = element_info_.size();
244+
245+
// Ensure doc_to_element_range_ is large enough
246+
if (static_cast<int64_t>(doc_to_element_range_.size()) <=
247+
pending.doc_id) {
248+
doc_to_element_range_.resize(pending.doc_id + 1);
249+
}
250+
doc_to_element_range_[pending.doc_id] = {elem_start, elem_end};
251+
252+
committed_doc_count_++;
253+
254+
// Remove from pending
255+
pending_docs_.erase(it);
256+
}
257+
}
258+
259+
} // namespace milvus

0 commit comments

Comments
 (0)