Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 27 additions & 26 deletions src/paimon/global_index/lumina/lumina_api_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class LuminaInterfaceTest : public ::testing::Test {
void TearDown() override {}

void WriteAndFlush(const std::string& index_path,
const std::vector<::lumina::core::VectorId>& row_ids) const {
const std::vector<::lumina::core::vector_id_t>& row_ids) const {
auto fs = std::make_shared<LocalFileSystem>();
std::shared_ptr<MemoryPool> paimon_pool = GetMemoryPool();
auto pool = std::make_shared<LuminaMemoryPool>(paimon_pool);
Expand All @@ -43,8 +43,8 @@ class LuminaInterfaceTest : public ::testing::Test {
.Set(::lumina::core::kEncodingType, ::lumina::core::kEncodingRawf32);
auto builder_result =
::lumina::api::LuminaBuilder::Create(builder_options, memory_resource);
ASSERT_TRUE(builder_result.status.IsOk()) << builder_result.status.Message();
auto writer = std::move(builder_result.value);
ASSERT_TRUE(builder_result.IsOk()) << builder_result.GetStatus().Message();
auto writer = std::move(builder_result).TakeValue();
// pretrain
ASSERT_TRUE(writer.Pretrain(/*data=*/nullptr, /*n=*/0).IsOk());
// insert data
Expand All @@ -65,9 +65,9 @@ class LuminaInterfaceTest : public ::testing::Test {
}

void Search(const std::string& index_path, int32_t topk,
const std::vector<::lumina::core::VectorId>& expected_row_ids,
const std::vector<::lumina::core::vector_id_t>& expected_row_ids,
const std::vector<float>& expected_distances,
const std::function<bool(::lumina::core::VectorId id)>& filter = nullptr) const {
const std::function<bool(::lumina::core::vector_id_t id)>& filter = nullptr) const {
ASSERT_EQ(expected_row_ids.size(), expected_distances.size());
auto fs = std::make_shared<LocalFileSystem>();
std::shared_ptr<MemoryPool> paimon_pool = GetMemoryPool();
Expand All @@ -77,12 +77,11 @@ class LuminaInterfaceTest : public ::testing::Test {
// create reader
::lumina::api::SearcherOptions searcher_options;
searcher_options.Set(::lumina::core::kIndexType, ::lumina::core::kIndexTypeBruteforce)
.Set(::lumina::core::kDimension, 4)
.Set(::lumina::core::kSearchThreadCount, 10);
.Set(::lumina::core::kDimension, 4);
auto reader_result =
::lumina::api::LuminaSearcher::Create(searcher_options, memory_resource);
ASSERT_TRUE(reader_result.status.IsOk());
auto reader = std::move(reader_result.value);
ASSERT_TRUE(reader_result.IsOk());
auto reader = std::move(reader_result).TakeValue();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, fs->Open(index_path));
auto file_reader = std::make_unique<LuminaFileReader>(in);
ASSERT_TRUE(reader.Open(std::move(file_reader), ::lumina::api::IOOptions()).IsOk());
Expand All @@ -102,18 +101,20 @@ class LuminaInterfaceTest : public ::testing::Test {
if (parallel_number > 0) {
search_options.Set(::lumina::core::kSearchParallelNumber, parallel_number);
}
::lumina::core::Result<::lumina::api::LuminaSearcher::SearchResult> search_result;

if (!filter) {
search_result = reader.Search(query, search_options, *pool);
auto search_result = reader.Search(query, search_options, *pool);
ASSERT_TRUE(search_result.IsOk()) << search_result.GetStatus().Message();
CheckResult(search_result.Value().topk, expected_row_ids, expected_distances);
} else {
search_options.Set(::lumina::core::kSearchThreadSafeFilter, true);
::lumina::extensions::SearchWithFilterExtension reader_with_filter;
ASSERT_TRUE(reader.Attach(reader_with_filter).IsOk());
search_result =
auto search_result =
reader_with_filter.SearchWithFilter(query, filter, search_options, *pool);
ASSERT_TRUE(search_result.IsOk()) << search_result.GetStatus().Message();
CheckResult(search_result.Value().topk, expected_row_ids, expected_distances);
}
ASSERT_TRUE(search_result.status.IsOk()) << search_result.status.Message();
CheckResult(search_result.value.topk, expected_row_ids, expected_distances);

// TODO(xinyu.lxy): check memory paimon_pool, current memory use = query mem +
// reader mem
Expand All @@ -133,7 +134,7 @@ class LuminaInterfaceTest : public ::testing::Test {
}

void CheckResult(const std::vector<::lumina::api::LuminaSearcher::SearchHit>& search_result,
const std::vector<::lumina::core::VectorId>& expected_row_ids,
const std::vector<::lumina::core::vector_id_t>& expected_row_ids,
const std::vector<float>& expected_distances) const {
ASSERT_EQ(search_result.size(), expected_row_ids.size());
for (size_t i = 0; i < search_result.size(); i++) {
Expand All @@ -155,11 +156,11 @@ TEST_F(LuminaInterfaceTest, TestSimple) {
std::string index_path = dir->Str() + "/lumina_test.index";

// write index
std::vector<::lumina::core::VectorId> row_ids = {0l, 1l, 2l, 3l};
std::vector<::lumina::core::vector_id_t> row_ids = {0l, 1l, 2l, 3l};
WriteAndFlush(index_path, row_ids);

// read index
std::vector<::lumina::core::VectorId> expected_row_ids = {3l, 1l, 2l, 0l};
std::vector<::lumina::core::vector_id_t> expected_row_ids = {3l, 1l, 2l, 0l};
std::vector<float> expected_distances = {0.01f, 2.01f, 2.21f, 4.21f};
Search(index_path, /*topk=*/4, expected_row_ids, expected_distances);
}
Expand All @@ -169,11 +170,11 @@ TEST_F(LuminaInterfaceTest, TestWithDocIdGap) {
std::string index_path = dir->Str() + "/lumina_test.index";

// write index
std::vector<::lumina::core::VectorId> row_ids = {0l, 2l, 4l, 6l};
std::vector<::lumina::core::vector_id_t> row_ids = {0l, 2l, 4l, 6l};
WriteAndFlush(index_path, row_ids);

// read index
std::vector<::lumina::core::VectorId> expected_row_ids = {6l, 2l, 4l, 0l};
std::vector<::lumina::core::vector_id_t> expected_row_ids = {6l, 2l, 4l, 0l};
std::vector<float> expected_distances = {0.01f, 2.01f, 2.21f, 4.21f};
Search(index_path, /*topk=*/4, expected_row_ids, expected_distances);
}
Expand All @@ -183,11 +184,11 @@ TEST_F(LuminaInterfaceTest, TestWithSmallTopk) {
std::string index_path = dir->Str() + "/lumina_test.index";

// write index
std::vector<::lumina::core::VectorId> row_ids = {0l, 1l, 2l, 3l};
std::vector<::lumina::core::vector_id_t> row_ids = {0l, 1l, 2l, 3l};
WriteAndFlush(index_path, row_ids);

// read index
std::vector<::lumina::core::VectorId> expected_row_ids = {3l, 1l, 2l};
std::vector<::lumina::core::vector_id_t> expected_row_ids = {3l, 1l, 2l};
std::vector<float> expected_distances = {0.01f, 2.01f, 2.21f};
Search(index_path, /*topk=*/3, expected_row_ids, expected_distances);
}
Expand All @@ -197,20 +198,20 @@ TEST_F(LuminaInterfaceTest, TestWithFilter) {
std::string index_path = dir->Str() + "/lumina_test.index";

// write index
std::vector<::lumina::core::VectorId> row_ids = {0l, 1l, 2l, 3l};
std::vector<::lumina::core::vector_id_t> row_ids = {0l, 1l, 2l, 3l};
WriteAndFlush(index_path, row_ids);

// read index
{
std::vector<::lumina::core::VectorId> expected_row_ids = {1l, 2l};
std::vector<::lumina::core::vector_id_t> expected_row_ids = {1l, 2l};
std::vector<float> expected_distances = {2.01f, 2.21f};
auto filter = [](::lumina::core::VectorId id) -> bool { return id < 3; };
auto filter = [](::lumina::core::vector_id_t id) -> bool { return id < 3; };
Search(index_path, /*topk=*/2, expected_row_ids, expected_distances, filter);
}
{
std::vector<::lumina::core::VectorId> expected_row_ids = {1l, 2l, 0l};
std::vector<::lumina::core::vector_id_t> expected_row_ids = {1l, 2l, 0l};
std::vector<float> expected_distances = {2.01f, 2.21f, 4.21f};
auto filter = [](::lumina::core::VectorId id) -> bool { return id < 3; };
auto filter = [](::lumina::core::vector_id_t id) -> bool { return id < 3; };
Search(index_path, /*topk=*/4, expected_row_ids, expected_distances, filter);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/global_index/lumina/lumina_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class LuminaFileReader : public ::lumina::io::FileReader {
return PaimonToLuminaStatus(read_result.status());
}
if (static_cast<uint64_t>(read_result.value()) != current_read_size) {
return ::lumina::core::Status::Error(
return ::lumina::core::Status(
::lumina::core::ErrorCode::IoError,
fmt::format("expect read len {} mismatch actual read len {}", current_read_size,
read_result.value()));
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/global_index/lumina/lumina_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class LuminaFileWriter : public ::lumina::io::FileWriter {
return PaimonToLuminaStatus(write_result.status());
}
if (static_cast<uint64_t>(write_result.value()) != current_write_size) {
return ::lumina::core::Status::Error(
return ::lumina::core::Status(
::lumina::core::ErrorCode::IoError,
fmt::format("expect write len {} mismatch actual write len {}",
current_write_size, write_result.value()));
Expand Down
7 changes: 3 additions & 4 deletions src/paimon/global_index/lumina/lumina_global_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class LuminaDataset : public ::lumina::api::Dataset {

::lumina::core::Result<uint64_t> GetNextBatch(
std::vector<float>& vector_buffer,
std::vector<::lumina::core::VectorId>& id_buffer) override {
std::vector<::lumina::core::vector_id_t>& id_buffer) override {
if (cursor_ >= array_vec_.size()) {
return ::lumina::core::Result<uint64_t>::Ok(0);
}
Expand All @@ -189,7 +189,7 @@ class LuminaDataset : public ::lumina::api::Dataset {
uint32_t dimension_;
std::vector<std::shared_ptr<arrow::FloatArray>> array_vec_;
size_t cursor_ = 0;
::lumina::core::VectorId id_ = 0;
::lumina::core::vector_id_t id_ = 0;
};

LuminaIndexWriter::LuminaIndexWriter(const std::string& field_name,
Expand All @@ -208,7 +208,6 @@ LuminaIndexWriter::LuminaIndexWriter(const std::string& field_name,
io_options_(std::move(io_options)) {}

Status LuminaIndexWriter::AddBatch(::ArrowArray* arrow_array) {
// TODO(xinyu.lxy): may use async thread to read data and build index
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> array,
arrow::ImportArray(arrow_array, arrow_type_));
if (array->null_count() != 0) {
Expand Down Expand Up @@ -297,7 +296,7 @@ Result<std::shared_ptr<VectorSearchGlobalIndexResult>> LuminaIndexReader::VisitV
} else {
search_options.Set(::lumina::core::kSearchThreadSafeFilter, true);
auto lumina_filter = [filter = vector_search->pre_filter](
::lumina::core::VectorId id) -> bool { return filter(id); };
::lumina::core::vector_id_t id) -> bool { return filter(id); };
PAIMON_ASSIGN_OR_RAISE_FROM_LUMINA(
search_result, searcher_with_filter_->SearchWithFilter(lumina_query, lumina_filter,
search_options, *pool_));
Expand Down
16 changes: 8 additions & 8 deletions src/paimon/global_index/lumina/lumina_global_index_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ class LuminaGlobalIndexTest : public ::testing::Test {
private:
std::shared_ptr<MemoryPool> pool_ = GetDefaultPool();
std::shared_ptr<FileSystem> fs_ = std::make_shared<LocalFileSystem>();
std::map<std::string, std::string> options_ = {{"lumina.dimension", "4"},
{"lumina.indextype", "bruteforce"},
std::map<std::string, std::string> options_ = {{"lumina.index.dimension", "4"},
{"lumina.index.type", "bruteforce"},
{"lumina.distance.metric", "l2"},
{"lumina.encoding.type", "encoding.rawf32"},
{"lumina.search.threadcount", "10"}};
{"lumina.encoding.type", "rawf32"},
{"lumina.search.thread_count", "10"}};
std::shared_ptr<arrow::DataType> data_type_ =
arrow::struct_({arrow::field("f0", arrow::list(arrow::float32()))});
std::shared_ptr<arrow::Array> array_ = arrow::ipc::internal::json::ArrayFromJSON(data_type_,
Expand Down Expand Up @@ -250,15 +250,15 @@ TEST_F(LuminaGlobalIndexTest, TestInvalidInputs) {
{
// invalid options
std::map<std::string, std::string> options = options_;
options["lumina.dimension"] = "xxx";
options["lumina.index.dimension"] = "xxx";
ASSERT_NOK_WITH_MSG(
WriteGlobalIndex(index_root, data_type_, options, /*array=*/nullptr, Range(0, 0)),
"convert key lumina.dimension, value xxx to unsigned int failed");
"convert key lumina.index.dimension, value xxx to unsigned int failed");
GlobalIndexIOMeta fake_meta("fake_file_name", /*file_size=*/10,
/*range_end=*/5,
/*metadata=*/nullptr);
ASSERT_NOK_WITH_MSG(CreateGlobalIndexReader(index_root, data_type_, options, fake_meta),
"convert key lumina.dimension, value xxx to unsigned int failed");
"convert key lumina.index.dimension, value xxx to unsigned int failed");
}
{
// invalid inputs in write
Expand Down Expand Up @@ -367,7 +367,7 @@ TEST_F(LuminaGlobalIndexTest, TestInvalidInputs) {
}
{
std::map<std::string, std::string> options = options_;
options["lumina.dimension"] = "5";
options["lumina.index.dimension"] = "5";
ASSERT_NOK_WITH_MSG(CreateGlobalIndexReader(index_root, data_type_, options, meta),
"lumina index dimension 4 mismatch dimension 5 in options");
}
Expand Down
31 changes: 14 additions & 17 deletions src/paimon/global_index/lumina/lumina_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ namespace paimon::lumina {
} \
} while (false)

#define PAIMON_ASSIGN_OR_RAISE_IMPL_FROM_LUMINA(result_name, lhs, rexpr) \
auto&& result_name = (rexpr); \
PAIMON_RETURN_IF_(!(result_name).IsOk(), LuminaToPaimonStatus((result_name).status), \
PAIMON_STRINGIFY(rexpr)); \
lhs = std::move(result_name.value);
#define PAIMON_ASSIGN_OR_RAISE_IMPL_FROM_LUMINA(result_name, lhs, rexpr) \
auto&& result_name = (rexpr); \
PAIMON_RETURN_IF_(!(result_name).IsOk(), LuminaToPaimonStatus((result_name).GetStatus()), \
PAIMON_STRINGIFY(rexpr)); \
lhs = std::move(result_name).TakeValue();

#define PAIMON_ASSIGN_OR_RAISE_FROM_LUMINA(lhs, rexpr) \
PAIMON_ASSIGN_OR_RAISE_IMPL_FROM_LUMINA( \
Expand All @@ -45,23 +45,20 @@ inline ::lumina::core::Status PaimonToLuminaStatus(const Status& status) {
case StatusCode::OK:
return ::lumina::core::Status::Ok();
case StatusCode::OutOfMemory:
return ::lumina::core::Status::Error(::lumina::core::ErrorCode::OutOfMemory,
status.message());
return ::lumina::core::Status(::lumina::core::ErrorCode::OutOfMemory, status.message());
case StatusCode::IOError:
return ::lumina::core::Status::Error(::lumina::core::ErrorCode::IoError,
status.message());
return ::lumina::core::Status(::lumina::core::ErrorCode::IoError, status.message());
case StatusCode::NotImplemented:
return ::lumina::core::Status::Error(::lumina::core::ErrorCode::NotSupported,
status.message());
return ::lumina::core::Status(::lumina::core::ErrorCode::NotSupported,
status.message());
case StatusCode::NotExist:
return ::lumina::core::Status::Error(::lumina::core::ErrorCode::NotFound,
status.message());
return ::lumina::core::Status(::lumina::core::ErrorCode::NotFound, status.message());
case StatusCode::Exist:
return ::lumina::core::Status::Error(::lumina::core::ErrorCode::AlreadyExists,
status.message());
return ::lumina::core::Status(::lumina::core::ErrorCode::AlreadyExists,
status.message());
default:
return ::lumina::core::Status::Error(::lumina::core::ErrorCode::InvalidArgument,
status.message());
return ::lumina::core::Status(::lumina::core::ErrorCode::InvalidArgument,
status.message());
}
}

Expand Down
Loading
Loading