Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/paimon/common/data/generic_row.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ class GenericRow : public InternalRow {
}

void AddDataHolder(std::unique_ptr<InternalRow>&& holder) {
holders_.push_back(std::move(holder));
row_holder_.push_back(std::move(holder));
}

void AddDataHolder(const std::shared_ptr<Bytes>& bytes) {
bytes_holder_ = bytes;
}

bool GetBoolean(int32_t pos) const override {
Expand Down Expand Up @@ -218,8 +222,9 @@ class GenericRow : public InternalRow {
/// The array to store the actual internal format values.
std::vector<VariantType> fields_;
/// As GenericRow only holds string view for string data to avoid deep copy, original data must
/// be held in holders_
std::vector<std::unique_ptr<InternalRow>> holders_;
/// be held in row holders_ or bytes holder
std::vector<std::unique_ptr<InternalRow>> row_holder_;
std::shared_ptr<Bytes> bytes_holder_;
/// The kind of change that a row describes in a changelog.
const RowKind* kind_;
};
Expand Down
114 changes: 96 additions & 18 deletions src/paimon/common/data/serializer/row_compacted_serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,102 @@ Result<std::unique_ptr<RowCompactedSerializer>> RowCompactedSerializer::Create(
schema, std::move(getters), std::move(writers), std::move(readers), pool));
}

Result<int32_t> RowCompactedSerializer::CompareField(const FieldInfo& field_info,
RowReader* reader1, RowReader* reader2) {
auto type = field_info.type_id;
switch (type) {
case arrow::Type::type::BOOL: {
auto val1 = reader1->ReadValue<bool>();
auto val2 = reader2->ReadValue<bool>();
return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
}
case arrow::Type::type::INT8: {
auto val1 = reader1->ReadValue<char>();
auto val2 = reader2->ReadValue<char>();
return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
}
case arrow::Type::type::INT16: {
auto val1 = reader1->ReadValue<int16_t>();
auto val2 = reader2->ReadValue<int16_t>();
return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
}
case arrow::Type::type::INT32:
case arrow::Type::type::DATE32: {
auto val1 = reader1->ReadValue<int32_t>();
auto val2 = reader2->ReadValue<int32_t>();
return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
}
case arrow::Type::type::INT64: {
auto val1 = reader1->ReadValue<int64_t>();
auto val2 = reader2->ReadValue<int64_t>();
return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
}
case arrow::Type::type::FLOAT: {
auto val1 = reader1->ReadValue<float>();
auto val2 = reader2->ReadValue<float>();
return FieldsComparator::CompareFloatingPoint(val1, val2);
}
case arrow::Type::type::DOUBLE: {
auto val1 = reader1->ReadValue<double>();
auto val2 = reader2->ReadValue<double>();
return FieldsComparator::CompareFloatingPoint(val1, val2);
}
case arrow::Type::type::STRING:
case arrow::Type::type::BINARY: {
PAIMON_ASSIGN_OR_RAISE(std::string_view val1, reader1->ReadStringView());
PAIMON_ASSIGN_OR_RAISE(std::string_view val2, reader2->ReadStringView());
int32_t cmp = val1.compare(val2);
return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1);
}
case arrow::Type::type::TIMESTAMP: {
PAIMON_ASSIGN_OR_RAISE(Timestamp val1, reader1->ReadTimestamp(field_info.precision));
PAIMON_ASSIGN_OR_RAISE(Timestamp val2, reader2->ReadTimestamp(field_info.precision));
return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
}
case arrow::Type::type::DECIMAL: {
PAIMON_ASSIGN_OR_RAISE(Decimal val1,
reader1->ReadDecimal(field_info.precision, field_info.scale));
PAIMON_ASSIGN_OR_RAISE(Decimal val2,
reader2->ReadDecimal(field_info.precision, field_info.scale));
int32_t cmp = val1.CompareTo(val2);
return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1);
}
default:
return Status::NotImplemented(
fmt::format("Do not support comparing type {} in CompareField",
static_cast<int32_t>(field_info.type_id)));
}
}

Result<MemorySlice::SliceComparator> RowCompactedSerializer::CreateSliceComparator(
const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<MemoryPool>& pool) {
int32_t bit_set_in_bytes = RowCompactedSerializer::CalculateBitSetInBytes(schema->num_fields());
auto row_reader1 = std::make_shared<RowReader>(bit_set_in_bytes, pool);
auto row_reader2 = std::make_shared<RowReader>(bit_set_in_bytes, pool);
std::vector<RowCompactedSerializer::FieldReader> readers(schema->num_fields());
std::vector<FieldsComparator::VariantComparatorFunc> comparators(schema->num_fields());

std::vector<FieldInfo> field_infos(schema->num_fields());
for (int32_t i = 0; i < schema->num_fields(); i++) {
auto field_type = schema->field(i)->type();
PAIMON_ASSIGN_OR_RAISE(readers[i], CreateFieldReader(field_type, pool));
PAIMON_ASSIGN_OR_RAISE(comparators[i], FieldsComparator::CompareVariant(i, field_type));
field_infos[i].type_id = field_type->id();
if (field_type->id() == arrow::Type::type::TIMESTAMP) {
auto timestamp_type =
arrow::internal::checked_pointer_cast<arrow::TimestampType>(field_type);
assert(timestamp_type);
field_infos[i].precision = DateTimeUtils::GetPrecisionFromType(timestamp_type);
} else if (field_type->id() == arrow::Type::type::DECIMAL) {
auto decimal_type =
arrow::internal::checked_pointer_cast<arrow::Decimal128Type>(field_type);
assert(decimal_type);
field_infos[i].precision = decimal_type->precision();
field_infos[i].scale = decimal_type->scale();
}
}
auto comparator = [row_reader1, row_reader2, readers, comparators](
const std::shared_ptr<MemorySlice>& slice1,
const std::shared_ptr<MemorySlice>& slice2) -> Result<int32_t> {
row_reader1->PointTo(slice1->GetSegment(), slice1->Offset());
row_reader2->PointTo(slice2->GetSegment(), slice2->Offset());
for (int32_t i = 0; i < static_cast<int32_t>(readers.size()); i++) {

auto comparator = [row_reader1, row_reader2, field_infos](
const MemorySlice& slice1, const MemorySlice& slice2) -> Result<int32_t> {
row_reader1->PointTo(slice1.GetSegment(), slice1.Offset());
row_reader2->PointTo(slice2.GetSegment(), slice2.Offset());
for (int32_t i = 0; i < static_cast<int32_t>(field_infos.size()); i++) {
bool is_null1 = row_reader1->IsNullAt(i);
bool is_null2 = row_reader2->IsNullAt(i);
if (!is_null1 || !is_null2) {
Expand All @@ -68,9 +146,9 @@ Result<MemorySlice::SliceComparator> RowCompactedSerializer::CreateSliceComparat
} else if (is_null2) {
return 1;
} else {
PAIMON_ASSIGN_OR_RAISE(VariantType field1, readers[i](i, row_reader1.get()));
PAIMON_ASSIGN_OR_RAISE(VariantType field2, readers[i](i, row_reader2.get()));
int32_t comp = comparators[i](field1, field2);
PAIMON_ASSIGN_OR_RAISE(
int32_t comp,
CompareField(field_infos[i], row_reader1.get(), row_reader2.get()));
if (comp != 0) {
return comp;
}
Expand All @@ -79,8 +157,7 @@ Result<MemorySlice::SliceComparator> RowCompactedSerializer::CreateSliceComparat
}
return 0;
};
return std::function<Result<int32_t>(const std::shared_ptr<MemorySlice>&,
const std::shared_ptr<MemorySlice>&)>(comparator);
return std::function<Result<int32_t>(const MemorySlice&, const MemorySlice&)>(comparator);
}

Result<std::shared_ptr<Bytes>> RowCompactedSerializer::SerializeToBytes(const InternalRow& row) {
Expand Down Expand Up @@ -110,6 +187,7 @@ Result<std::unique_ptr<InternalRow>> RowCompactedSerializer::Deserialize(
PAIMON_ASSIGN_OR_RAISE(VariantType field, readers_[i](i, row_reader_.get()));
row->SetField(i, field);
}
row->AddDataHolder(bytes);
return row;
}

Expand Down Expand Up @@ -175,7 +253,7 @@ Result<RowCompactedSerializer::FieldReader> RowCompactedSerializer::CreateFieldR
}
case arrow::Type::type::STRING: {
field_reader = [](int32_t pos, RowReader* reader) -> Result<VariantType> {
PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadString());
PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadStringView());
return value;
};
break;
Expand Down Expand Up @@ -497,9 +575,9 @@ Result<const RowKind*> RowCompactedSerializer::RowReader::ReadRowKind() const {
return RowKind::FromByteValue(static_cast<int8_t>(b));
}

Result<BinaryString> RowCompactedSerializer::RowReader::ReadString() {
Result<std::string_view> RowCompactedSerializer::RowReader::ReadStringView() {
PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt());
BinaryString str = BinaryString::FromAddress(segment_, position_, length);
std::string_view str(segment_.GetArray()->data() + position_, length);
position_ += length;
return str;
}
Expand Down
15 changes: 14 additions & 1 deletion src/paimon/common/data/serializer/row_compacted_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
#pragma once
#include <functional>

#include "arrow/api.h"
#include "paimon/common/data/binary_row.h"
#include "paimon/common/data/binary_writer.h"
#include "paimon/common/memory/memory_segment.h"
#include "paimon/common/memory/memory_segment_utils.h"
#include "paimon/common/memory/memory_slice.h"
#include "paimon/common/utils/var_length_int_utils.h"
namespace paimon {

class RowCompactedSerializer {
public:
static Result<std::unique_ptr<RowCompactedSerializer>> Create(
Expand All @@ -41,6 +43,12 @@ class RowCompactedSerializer {
const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<MemoryPool>& pool);

private:
struct FieldInfo {
arrow::Type::type type_id;
int32_t precision = -1;
int32_t scale = -1;
};

class RowWriter {
public:
RowWriter(int32_t header_size_in_bytes, const std::shared_ptr<MemoryPool>& pool);
Expand Down Expand Up @@ -134,7 +142,7 @@ class RowCompactedSerializer {
return value;
}

Result<BinaryString> ReadString();
Result<std::string_view> ReadStringView();

Result<std::shared_ptr<Bytes>> ReadBinary();

Expand All @@ -161,6 +169,10 @@ class RowCompactedSerializer {
int32_t position_ = 0;
};

/// Read and compare a single field from two RowReaders.
static Result<int32_t> CompareField(const FieldInfo& field_info, RowReader* reader1,
RowReader* reader2);

using FieldWriter = std::function<Status(int32_t, const VariantType&, RowWriter*)>;
using FieldReader = std::function<Result<VariantType>(int32_t, RowReader*)>;

Expand All @@ -184,4 +196,5 @@ class RowCompactedSerializer {
std::unique_ptr<RowWriter> row_writer_;
std::unique_ptr<RowReader> row_reader_;
};

} // namespace paimon
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ TEST(RowCompactedSerializerTest, TestSimple) {
ASSERT_EQ(de_row->GetLong(4), 4294967295l);
ASSERT_EQ(de_row->GetFloat(5), 0.5);
ASSERT_EQ(de_row->GetDouble(6), 1.141);
ASSERT_EQ(de_row->GetString(7), BinaryString::FromString("20250327", pool.get()));
ASSERT_EQ(de_row->GetStringView(7), "20250327");
auto f9_bytes = std::make_shared<Bytes>("banana", pool.get());
ASSERT_EQ(*de_row->GetBinary(8), *f9_bytes);
ASSERT_EQ(de_row->GetDate(9), 2026);
Expand Down Expand Up @@ -450,7 +450,7 @@ TEST(RowCompactedSerializerTest, TestNestedNullWithTimestampAndDecimal2) {
ASSERT_EQ(row->GetFieldCount(), 2);
auto inner_row = row->GetRow(0, 3);
ASSERT_EQ(inner_row->GetFieldCount(), 3);
ASSERT_EQ(row->GetString(1).ToString(), "Alice");
ASSERT_EQ(row->GetStringView(1), "Alice");

// test compatibility
std::vector<uint8_t> java_bytes = {
Expand Down Expand Up @@ -485,7 +485,7 @@ TEST(RowCompactedSerializerTest, TestNestedNullWithTimestampAndDecimal2) {
ASSERT_EQ(row->GetFieldCount(), 2);
auto inner_row = row->GetRow(0, 3);
ASSERT_EQ(inner_row->GetFieldCount(), 3);
ASSERT_EQ(row->GetString(1).ToString(), "Bob");
ASSERT_EQ(row->GetStringView(1), "Bob");

// test compatibility
std::vector<uint8_t> java_bytes = {
Expand Down
24 changes: 19 additions & 5 deletions src/paimon/common/io/cache/cache_key.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class CacheKey {
virtual ~CacheKey() = default;

virtual bool IsIndex() const = 0;
virtual size_t HashCode() const = 0;
};

class PositionCacheKey : public CacheKey {
Expand All @@ -41,12 +42,11 @@ class PositionCacheKey : public CacheKey {
: file_path_(file_path), position_(position), length_(length), is_index_(is_index) {}

bool IsIndex() const override;

size_t HashCode() const override;
int64_t Position() const;
int32_t Length() const;

bool operator==(const PositionCacheKey& other) const;
size_t HashCode() const;

private:
static constexpr uint64_t HASH_CONSTANT = 0x9e3779b97f4a7c15ULL;
Expand All @@ -60,9 +60,23 @@ class PositionCacheKey : public CacheKey {

namespace std {
template <>
struct hash<paimon::PositionCacheKey> {
size_t operator()(const paimon::PositionCacheKey& key) const {
return key.HashCode();
struct hash<std::shared_ptr<paimon::CacheKey>> {
size_t operator()(const std::shared_ptr<paimon::CacheKey>& key) const {
return key ? key->HashCode() : 0;
}
};

template <>
struct equal_to<std::shared_ptr<paimon::CacheKey>> {
bool operator()(const std::shared_ptr<paimon::CacheKey>& lhs,
const std::shared_ptr<paimon::CacheKey>& rhs) const {
if (lhs == rhs) {
return true;
}
if (!lhs || !rhs) {
return false;
}
return lhs->HashCode() == rhs->HashCode();
}
};
} // namespace std
3 changes: 1 addition & 2 deletions src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ Status SortLookupStoreWriter::Close() {
PAIMON_RETURN_NOT_OK(writer_->Flush());
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BloomFilterHandle> bloom_filter_handle,
writer_->WriteBloomFilter());
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BlockHandle> index_block_handle,
writer_->WriteIndexBlock());
PAIMON_ASSIGN_OR_RAISE(BlockHandle index_block_handle, writer_->WriteIndexBlock());
PAIMON_RETURN_NOT_OK(writer_->WriteFooter(index_block_handle, bloom_filter_handle));
PAIMON_RETURN_NOT_OK(out_->Close());
return Status::OK();
Expand Down
Loading
Loading