Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions include/paimon/global_index/global_index_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <string>
#include <utility>

#include "paimon/memory/bytes.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/result.h"
#include "paimon/visibility.h"

Expand Down Expand Up @@ -62,6 +64,38 @@ class PAIMON_EXPORT GlobalIndexResult : public std::enable_shared_from_this<Glob
const std::shared_ptr<GlobalIndexResult>& other);

virtual std::string ToString() const = 0;

/// Serializes a GlobalIndexResult object into a byte array.
///
/// @note This method only supports the following concrete implementations:
/// - BitmapTopKGlobalIndexResult
/// - BitmapGlobalIndexResult
///
/// @param global_index_result The GlobalIndexResult instance to serialize (must not be null).
/// @param pool Memory pool used to allocate the output byte buffer.
/// @return A Result containing a unique pointer to the serialized Bytes on success,
/// or an error status on failure.
static Result<PAIMON_UNIQUE_PTR<Bytes>> Serialize(
const std::shared_ptr<GlobalIndexResult>& global_index_result,
const std::shared_ptr<MemoryPool>& pool);

/// Deserializes a GlobalIndexResult object from a raw byte buffer.
///
/// @note The concrete type of the deserialized object is determined by metadata
/// embedded in the buffer. Currently, only the following types are supported:
/// - BitmapTopKGlobalIndexResult
/// - BitmapGlobalIndexResult
///
/// @param buffer Pointer to the serialized byte data (must not be null).
/// @param length Size of the buffer in bytes.
/// @param pool Memory pool used to allocate internal objects during deserialization.
/// @return A Result containing a shared pointer to the reconstructed GlobalIndexResult
/// on success, or an error status on failure.
static Result<std::shared_ptr<GlobalIndexResult>> Deserialize(
const char* buffer, size_t length, const std::shared_ptr<MemoryPool>& pool);

private:
static constexpr int32_t VERSION = 1;
};

/// Represents the result of a Top-K query against a global index.
Expand Down
77 changes: 77 additions & 0 deletions src/paimon/common/global_index/global_index_result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,30 @@

#include "paimon/global_index/global_index_result.h"

#include "fmt/format.h"
#include "paimon/common/io/memory_segment_output_stream.h"
#include "paimon/common/memory/memory_segment_utils.h"
#include "paimon/global_index/bitmap_global_index_result.h"
#include "paimon/global_index/bitmap_topk_global_index_result.h"
#include "paimon/io/byte_array_input_stream.h"
#include "paimon/io/data_input_stream.h"
#include "paimon/memory/bytes.h"
#include "paimon/memory/memory_pool.h"
namespace paimon {
namespace {
void WriteBitmapAndScores(const RoaringBitmap64* bitmap, const std::vector<float>& scores,
MemorySegmentOutputStream* out, MemoryPool* pool) {
std::shared_ptr<Bytes> bitmap_bytes = bitmap->Serialize(pool);
out->WriteValue<int32_t>(bitmap_bytes->size());
out->WriteBytes(bitmap_bytes);

out->WriteValue<int32_t>(scores.size());
for (auto score : scores) {
out->WriteValue<float>(score);
}
}

} // namespace
Result<std::shared_ptr<GlobalIndexResult>> GlobalIndexResult::And(
const std::shared_ptr<GlobalIndexResult>& other) {
auto supplier = [other, result = shared_from_this()]() -> Result<RoaringBitmap64> {
Expand Down Expand Up @@ -57,4 +79,59 @@ Result<std::shared_ptr<GlobalIndexResult>> GlobalIndexResult::Or(
};
return std::make_shared<BitmapGlobalIndexResult>(supplier);
}

Result<PAIMON_UNIQUE_PTR<Bytes>> GlobalIndexResult::Serialize(
const std::shared_ptr<GlobalIndexResult>& global_index_result,
const std::shared_ptr<MemoryPool>& pool) {
MemorySegmentOutputStream out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool);
out.WriteValue<int32_t>(VERSION);
if (auto bitmap_result =
std::dynamic_pointer_cast<BitmapGlobalIndexResult>(global_index_result)) {
PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* bitmap, bitmap_result->GetBitmap());
WriteBitmapAndScores(bitmap, {}, &out, pool.get());
} else if (auto bitmap_topk_result =
std::dynamic_pointer_cast<BitmapTopKGlobalIndexResult>(global_index_result)) {
PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* bitmap, bitmap_topk_result->GetBitmap());
const auto& scores = bitmap_topk_result->GetScores();
WriteBitmapAndScores(bitmap, scores, &out, pool.get());
} else {
return Status::Invalid(
"invalid GlobalIndexResult, must be BitmapGlobalIndexResult or "
"BitmapTopkGlobalIndexResult");
}
return MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get());
}

Result<std::shared_ptr<GlobalIndexResult>> GlobalIndexResult::Deserialize(
const char* buffer, size_t length, const std::shared_ptr<MemoryPool>& pool) {
auto input_stream = std::make_shared<ByteArrayInputStream>(buffer, length);
DataInputStream in(input_stream);
PAIMON_ASSIGN_OR_RAISE(int32_t version, in.ReadValue<int32_t>());
if (version != VERSION) {
return Status::Invalid(
fmt::format(fmt::format("invalid version {} for GlobalIndexResult", version)));
}
PAIMON_ASSIGN_OR_RAISE(int32_t bitmap_bytes_len, in.ReadValue<int32_t>());
auto bitmap_bytes = Bytes::AllocateBytes(bitmap_bytes_len, pool.get());
PAIMON_RETURN_NOT_OK(in.ReadBytes(bitmap_bytes.get()));
RoaringBitmap64 bitmap;
PAIMON_RETURN_NOT_OK(bitmap.Deserialize(bitmap_bytes->data(), bitmap_bytes->size()));

PAIMON_ASSIGN_OR_RAISE(int32_t score_len, in.ReadValue<int32_t>());
if (score_len == 0) {
return std::make_shared<BitmapGlobalIndexResult>(
[bitmap]() -> Result<RoaringBitmap64> { return bitmap; });
}
if (score_len != bitmap.Cardinality()) {
return Status::Invalid("row id count mismatches score count");
}
std::vector<float> scores;
scores.reserve(score_len);
for (int32_t i = 0; i < score_len; i++) {
PAIMON_ASSIGN_OR_RAISE(float score, in.ReadValue<float>());
scores.push_back(score);
}
return std::make_shared<BitmapTopKGlobalIndexResult>(std::move(bitmap), std::move(scores));
}

} // namespace paimon
58 changes: 58 additions & 0 deletions src/paimon/common/global_index/global_index_result_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <utility>

#include "gtest/gtest.h"
#include "paimon/global_index/bitmap_global_index_result.h"
#include "paimon/global_index/bitmap_topk_global_index_result.h"
#include "paimon/testing/utils/testharness.h"

namespace paimon::test {
Expand Down Expand Up @@ -75,4 +77,60 @@ TEST_F(GlobalIndexResultTest, TestSimple) {
ASSERT_OK_AND_ASSIGN(auto or_result, result1->Or(result2));
ASSERT_EQ(or_result->ToString(), "{1,3,4,5,100,200}");
}

TEST_F(GlobalIndexResultTest, TestSerializeAndDeserializeSimple) {
auto pool = GetDefaultPool();
std::vector<uint8_t> byte_buffer = {
0, 0, 0, 1, 0, 0, 0, 69, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 59,
48, 3, 0, 5, 0, 0, 5, 0, 255, 127, 0, 0, 0, 128, 2, 0, 245, 133, 0, 0, 37,
0, 0, 0, 47, 0, 0, 0, 49, 0, 0, 0, 55, 0, 0, 0, 2, 0, 1, 0, 4, 0,
10, 0, 0, 0, 255, 255, 1, 0, 0, 0, 2, 0, 255, 224, 0, 0, 0, 0};
ASSERT_OK_AND_ASSIGN(
std::shared_ptr<GlobalIndexResult> index_result,
GlobalIndexResult::Deserialize((char*)byte_buffer.data(), byte_buffer.size(), pool));
auto typed_result = std::dynamic_pointer_cast<BitmapGlobalIndexResult>(index_result);
ASSERT_TRUE(typed_result);

auto bitmap = RoaringBitmap64::From(
{1l, 2l, 3l, 4l, 5l, 10l, 2247483647l, 2147483647l, 2147483648l, 2147483649l, 2147483650l});
auto expected_result = std::make_shared<BitmapGlobalIndexResult>(
[bitmap]() -> Result<RoaringBitmap64> { return bitmap; });
ASSERT_EQ(expected_result->ToString(), typed_result->ToString());
ASSERT_OK_AND_ASSIGN(auto serialize_bytes, GlobalIndexResult::Serialize(index_result, pool));
ASSERT_EQ(byte_buffer, std::vector<uint8_t>(serialize_bytes->data(),
serialize_bytes->data() + serialize_bytes->size()));
}

TEST_F(GlobalIndexResultTest, TestSerializeAndDeserializeWithScore) {
auto pool = GetDefaultPool();
std::vector<uint8_t> byte_buffer = {
0, 0, 0, 1, 0, 0, 0, 64, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
58, 48, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 255, 127, 0, 0, 0, 128, 2, 0,
245, 133, 0, 0, 40, 0, 0, 0, 42, 0, 0, 0, 44, 0, 0, 0, 50, 0, 0, 0,
10, 0, 255, 255, 1, 0, 3, 0, 5, 0, 255, 224, 0, 0, 0, 6, 63, 129, 71, 174,
191, 168, 245, 195, 64, 135, 92, 41, 66, 74, 245, 195, 194, 200, 128, 0, 64, 6, 102, 102};
ASSERT_OK_AND_ASSIGN(
std::shared_ptr<GlobalIndexResult> index_result,
GlobalIndexResult::Deserialize((char*)byte_buffer.data(), byte_buffer.size(), pool));
auto typed_result = std::dynamic_pointer_cast<BitmapTopKGlobalIndexResult>(index_result);
ASSERT_TRUE(typed_result);

auto bitmap = RoaringBitmap64::From(
{10l, 2147483647l, 2147483649l, 2147483651l, 2147483653l, 2247483647l});
std::vector<float> scores = {1.01f, -1.32f, 4.23f, 50.74f, -100.25f, 2.10f};
auto expected_result =
std::make_shared<BitmapTopKGlobalIndexResult>(std::move(bitmap), std::move(scores));
ASSERT_EQ(expected_result->ToString(), typed_result->ToString());
ASSERT_OK_AND_ASSIGN(auto serialize_bytes, GlobalIndexResult::Serialize(index_result, pool));
ASSERT_EQ(byte_buffer, std::vector<uint8_t>(serialize_bytes->data(),
serialize_bytes->data() + serialize_bytes->size()));
}

TEST_F(GlobalIndexResultTest, TestInvalidSerialize) {
auto pool = GetDefaultPool();
auto result = std::make_shared<FakeGlobalIndexResult>(std::vector<int64_t>({1, 3, 5, 100}));
ASSERT_NOK_WITH_MSG(GlobalIndexResult::Serialize(result, pool),
"invalid GlobalIndexResult, must be BitmapGlobalIndexResult or "
"BitmapTopkGlobalIndexResult");
}
} // namespace paimon::test
Loading