diff --git a/src/paimon/common/data/generic_row.h b/src/paimon/common/data/generic_row.h index b5d96972..f2fcc02b 100644 --- a/src/paimon/common/data/generic_row.h +++ b/src/paimon/common/data/generic_row.h @@ -104,7 +104,11 @@ class GenericRow : public InternalRow { } void AddDataHolder(std::unique_ptr&& holder) { - holders_.push_back(std::move(holder)); + row_holder_.push_back(std::move(holder)); + } + + void AddDataHolder(const std::shared_ptr& bytes) { + bytes_holder_ = bytes; } bool GetBoolean(int32_t pos) const override { @@ -218,8 +222,9 @@ class GenericRow : public InternalRow { /// The array to store the actual internal format values. std::vector fields_; /// As GenericRow only holds string view for string data to avoid deep copy, original data must - /// be held in holders_ - std::vector> holders_; + /// be held in row holders_ or bytes holder + std::vector> row_holder_; + std::shared_ptr bytes_holder_; /// The kind of change that a row describes in a changelog. const RowKind* kind_; }; diff --git a/src/paimon/common/data/serializer/row_compacted_serializer.cpp b/src/paimon/common/data/serializer/row_compacted_serializer.cpp index eb178843..a1c66514 100644 --- a/src/paimon/common/data/serializer/row_compacted_serializer.cpp +++ b/src/paimon/common/data/serializer/row_compacted_serializer.cpp @@ -42,24 +42,102 @@ Result> RowCompactedSerializer::Create( schema, std::move(getters), std::move(writers), std::move(readers), pool)); } +Result RowCompactedSerializer::CompareField(const FieldInfo& field_info, + RowReader* reader1, RowReader* reader2) { + auto type = field_info.type_id; + switch (type) { + case arrow::Type::type::BOOL: { + auto val1 = reader1->ReadValue(); + auto val2 = reader2->ReadValue(); + return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1); + } + case arrow::Type::type::INT8: { + auto val1 = reader1->ReadValue(); + auto val2 = reader2->ReadValue(); + return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1); + } + case arrow::Type::type::INT16: { + auto val1 = reader1->ReadValue(); + auto val2 = reader2->ReadValue(); + return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1); + } + case arrow::Type::type::INT32: + case arrow::Type::type::DATE32: { + auto val1 = reader1->ReadValue(); + auto val2 = reader2->ReadValue(); + return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1); + } + case arrow::Type::type::INT64: { + auto val1 = reader1->ReadValue(); + auto val2 = reader2->ReadValue(); + return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1); + } + case arrow::Type::type::FLOAT: { + auto val1 = reader1->ReadValue(); + auto val2 = reader2->ReadValue(); + return FieldsComparator::CompareFloatingPoint(val1, val2); + } + case arrow::Type::type::DOUBLE: { + auto val1 = reader1->ReadValue(); + auto val2 = reader2->ReadValue(); + return FieldsComparator::CompareFloatingPoint(val1, val2); + } + case arrow::Type::type::STRING: + case arrow::Type::type::BINARY: { + PAIMON_ASSIGN_OR_RAISE(std::string_view val1, reader1->ReadStringView()); + PAIMON_ASSIGN_OR_RAISE(std::string_view val2, reader2->ReadStringView()); + int32_t cmp = val1.compare(val2); + return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1); + } + case arrow::Type::type::TIMESTAMP: { + PAIMON_ASSIGN_OR_RAISE(Timestamp val1, reader1->ReadTimestamp(field_info.precision)); + PAIMON_ASSIGN_OR_RAISE(Timestamp val2, reader2->ReadTimestamp(field_info.precision)); + return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1); + } + case arrow::Type::type::DECIMAL: { + PAIMON_ASSIGN_OR_RAISE(Decimal val1, + reader1->ReadDecimal(field_info.precision, field_info.scale)); + PAIMON_ASSIGN_OR_RAISE(Decimal val2, + reader2->ReadDecimal(field_info.precision, field_info.scale)); + int32_t cmp = val1.CompareTo(val2); + return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1); + } + default: + return Status::NotImplemented( + fmt::format("Do not support comparing type {} in CompareField", + static_cast(field_info.type_id))); + } +} + Result RowCompactedSerializer::CreateSliceComparator( const std::shared_ptr& schema, const std::shared_ptr& pool) { int32_t bit_set_in_bytes = RowCompactedSerializer::CalculateBitSetInBytes(schema->num_fields()); auto row_reader1 = std::make_shared(bit_set_in_bytes, pool); auto row_reader2 = std::make_shared(bit_set_in_bytes, pool); - std::vector readers(schema->num_fields()); - std::vector comparators(schema->num_fields()); + + std::vector field_infos(schema->num_fields()); for (int32_t i = 0; i < schema->num_fields(); i++) { auto field_type = schema->field(i)->type(); - PAIMON_ASSIGN_OR_RAISE(readers[i], CreateFieldReader(field_type, pool)); - PAIMON_ASSIGN_OR_RAISE(comparators[i], FieldsComparator::CompareVariant(i, field_type)); + field_infos[i].type_id = field_type->id(); + if (field_type->id() == arrow::Type::type::TIMESTAMP) { + auto timestamp_type = + arrow::internal::checked_pointer_cast(field_type); + assert(timestamp_type); + field_infos[i].precision = DateTimeUtils::GetPrecisionFromType(timestamp_type); + } else if (field_type->id() == arrow::Type::type::DECIMAL) { + auto decimal_type = + arrow::internal::checked_pointer_cast(field_type); + assert(decimal_type); + field_infos[i].precision = decimal_type->precision(); + field_infos[i].scale = decimal_type->scale(); + } } - auto comparator = [row_reader1, row_reader2, readers, comparators]( - const std::shared_ptr& slice1, - const std::shared_ptr& slice2) -> Result { - row_reader1->PointTo(slice1->GetSegment(), slice1->Offset()); - row_reader2->PointTo(slice2->GetSegment(), slice2->Offset()); - for (int32_t i = 0; i < static_cast(readers.size()); i++) { + + auto comparator = [row_reader1, row_reader2, field_infos]( + const MemorySlice& slice1, const MemorySlice& slice2) -> Result { + row_reader1->PointTo(slice1.GetSegment(), slice1.Offset()); + row_reader2->PointTo(slice2.GetSegment(), slice2.Offset()); + for (int32_t i = 0; i < static_cast(field_infos.size()); i++) { bool is_null1 = row_reader1->IsNullAt(i); bool is_null2 = row_reader2->IsNullAt(i); if (!is_null1 || !is_null2) { @@ -68,9 +146,9 @@ Result RowCompactedSerializer::CreateSliceComparat } else if (is_null2) { return 1; } else { - PAIMON_ASSIGN_OR_RAISE(VariantType field1, readers[i](i, row_reader1.get())); - PAIMON_ASSIGN_OR_RAISE(VariantType field2, readers[i](i, row_reader2.get())); - int32_t comp = comparators[i](field1, field2); + PAIMON_ASSIGN_OR_RAISE( + int32_t comp, + CompareField(field_infos[i], row_reader1.get(), row_reader2.get())); if (comp != 0) { return comp; } @@ -79,8 +157,7 @@ Result RowCompactedSerializer::CreateSliceComparat } return 0; }; - return std::function(const std::shared_ptr&, - const std::shared_ptr&)>(comparator); + return std::function(const MemorySlice&, const MemorySlice&)>(comparator); } Result> RowCompactedSerializer::SerializeToBytes(const InternalRow& row) { @@ -110,6 +187,7 @@ Result> RowCompactedSerializer::Deserialize( PAIMON_ASSIGN_OR_RAISE(VariantType field, readers_[i](i, row_reader_.get())); row->SetField(i, field); } + row->AddDataHolder(bytes); return row; } @@ -175,7 +253,7 @@ Result RowCompactedSerializer::CreateFieldR } case arrow::Type::type::STRING: { field_reader = [](int32_t pos, RowReader* reader) -> Result { - PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadString()); + PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadStringView()); return value; }; break; @@ -497,9 +575,9 @@ Result RowCompactedSerializer::RowReader::ReadRowKind() const { return RowKind::FromByteValue(static_cast(b)); } -Result RowCompactedSerializer::RowReader::ReadString() { +Result RowCompactedSerializer::RowReader::ReadStringView() { PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt()); - BinaryString str = BinaryString::FromAddress(segment_, position_, length); + std::string_view str(segment_.GetArray()->data() + position_, length); position_ += length; return str; } diff --git a/src/paimon/common/data/serializer/row_compacted_serializer.h b/src/paimon/common/data/serializer/row_compacted_serializer.h index 8a4de50e..3b4fd2ce 100644 --- a/src/paimon/common/data/serializer/row_compacted_serializer.h +++ b/src/paimon/common/data/serializer/row_compacted_serializer.h @@ -17,6 +17,7 @@ #pragma once #include +#include "arrow/api.h" #include "paimon/common/data/binary_row.h" #include "paimon/common/data/binary_writer.h" #include "paimon/common/memory/memory_segment.h" @@ -24,6 +25,7 @@ #include "paimon/common/memory/memory_slice.h" #include "paimon/common/utils/var_length_int_utils.h" namespace paimon { + class RowCompactedSerializer { public: static Result> Create( @@ -41,6 +43,12 @@ class RowCompactedSerializer { const std::shared_ptr& schema, const std::shared_ptr& pool); private: + struct FieldInfo { + arrow::Type::type type_id; + int32_t precision = -1; + int32_t scale = -1; + }; + class RowWriter { public: RowWriter(int32_t header_size_in_bytes, const std::shared_ptr& pool); @@ -134,7 +142,7 @@ class RowCompactedSerializer { return value; } - Result ReadString(); + Result ReadStringView(); Result> ReadBinary(); @@ -161,6 +169,10 @@ class RowCompactedSerializer { int32_t position_ = 0; }; + /// Read and compare a single field from two RowReaders. + static Result CompareField(const FieldInfo& field_info, RowReader* reader1, + RowReader* reader2); + using FieldWriter = std::function; using FieldReader = std::function(int32_t, RowReader*)>; @@ -184,4 +196,5 @@ class RowCompactedSerializer { std::unique_ptr row_writer_; std::unique_ptr row_reader_; }; + } // namespace paimon diff --git a/src/paimon/common/data/serializer/row_compacted_serializer_test.cpp b/src/paimon/common/data/serializer/row_compacted_serializer_test.cpp index 6d183d6e..5a070530 100644 --- a/src/paimon/common/data/serializer/row_compacted_serializer_test.cpp +++ b/src/paimon/common/data/serializer/row_compacted_serializer_test.cpp @@ -75,7 +75,7 @@ TEST(RowCompactedSerializerTest, TestSimple) { ASSERT_EQ(de_row->GetLong(4), 4294967295l); ASSERT_EQ(de_row->GetFloat(5), 0.5); ASSERT_EQ(de_row->GetDouble(6), 1.141); - ASSERT_EQ(de_row->GetString(7), BinaryString::FromString("20250327", pool.get())); + ASSERT_EQ(de_row->GetStringView(7), "20250327"); auto f9_bytes = std::make_shared("banana", pool.get()); ASSERT_EQ(*de_row->GetBinary(8), *f9_bytes); ASSERT_EQ(de_row->GetDate(9), 2026); @@ -450,7 +450,7 @@ TEST(RowCompactedSerializerTest, TestNestedNullWithTimestampAndDecimal2) { ASSERT_EQ(row->GetFieldCount(), 2); auto inner_row = row->GetRow(0, 3); ASSERT_EQ(inner_row->GetFieldCount(), 3); - ASSERT_EQ(row->GetString(1).ToString(), "Alice"); + ASSERT_EQ(row->GetStringView(1), "Alice"); // test compatibility std::vector java_bytes = { @@ -485,7 +485,7 @@ TEST(RowCompactedSerializerTest, TestNestedNullWithTimestampAndDecimal2) { ASSERT_EQ(row->GetFieldCount(), 2); auto inner_row = row->GetRow(0, 3); ASSERT_EQ(inner_row->GetFieldCount(), 3); - ASSERT_EQ(row->GetString(1).ToString(), "Bob"); + ASSERT_EQ(row->GetStringView(1), "Bob"); // test compatibility std::vector java_bytes = { diff --git a/src/paimon/common/io/cache/cache.cpp b/src/paimon/common/io/cache/cache.cpp index 16f96a82..caa886ea 100644 --- a/src/paimon/common/io/cache/cache.cpp +++ b/src/paimon/common/io/cache/cache.cpp @@ -35,7 +35,7 @@ void NoCache::InvalidateAll() { // do nothing } -std::unordered_map, std::shared_ptr> NoCache::AsMap() { +CacheKeyMap NoCache::AsMap() { return {}; } diff --git a/src/paimon/common/io/cache/cache.h b/src/paimon/common/io/cache/cache.h index 8826e9bf..b362d3e7 100644 --- a/src/paimon/common/io/cache/cache.h +++ b/src/paimon/common/io/cache/cache.h @@ -42,7 +42,7 @@ class Cache { virtual void InvalidateAll() = 0; - virtual std::unordered_map, std::shared_ptr> AsMap() = 0; + virtual CacheKeyMap AsMap() = 0; }; class NoCache : public Cache { @@ -55,7 +55,7 @@ class NoCache : public Cache { const std::shared_ptr& value) override; void Invalidate(const std::shared_ptr& key) override; void InvalidateAll() override; - std::unordered_map, std::shared_ptr> AsMap() override; + CacheKeyMap AsMap() override; }; class CacheValue { diff --git a/src/paimon/common/io/cache/cache_key.cpp b/src/paimon/common/io/cache/cache_key.cpp index fdcac9f0..a8fef1f1 100644 --- a/src/paimon/common/io/cache/cache_key.cpp +++ b/src/paimon/common/io/cache/cache_key.cpp @@ -35,10 +35,13 @@ int32_t PositionCacheKey::Length() const { return length_; } -bool PositionCacheKey::operator==(const PositionCacheKey& other) const { - return file_path_ == other.file_path_ && position_ == other.position_ && - - length_ == other.length_ && is_index_ == other.is_index_; +bool PositionCacheKey::Equals(const CacheKey& other) const { + const auto* rhs = dynamic_cast(&other); + if (!rhs) { + return false; + } + return file_path_ == rhs->file_path_ && position_ == rhs->position_ && + length_ == rhs->length_ && is_index_ == rhs->is_index_; } size_t PositionCacheKey::HashCode() const { diff --git a/src/paimon/common/io/cache/cache_key.h b/src/paimon/common/io/cache/cache_key.h index 792f36a6..36450a22 100644 --- a/src/paimon/common/io/cache/cache_key.h +++ b/src/paimon/common/io/cache/cache_key.h @@ -16,14 +16,14 @@ #pragma once #include -#include #include #include - -#include "paimon/status.h" +#include namespace paimon { +class CacheValue; + class CacheKey { public: static std::shared_ptr ForPosition(const std::string& file_path, int64_t position, @@ -33,6 +33,8 @@ class CacheKey { virtual ~CacheKey() = default; virtual bool IsIndex() const = 0; + virtual size_t HashCode() const = 0; + virtual bool Equals(const CacheKey& other) const = 0; }; class PositionCacheKey : public CacheKey { @@ -41,13 +43,11 @@ class PositionCacheKey : public CacheKey { : file_path_(file_path), position_(position), length_(length), is_index_(is_index) {} bool IsIndex() const override; - + size_t HashCode() const override; + bool Equals(const CacheKey& other) const override; int64_t Position() const; int32_t Length() const; - bool operator==(const PositionCacheKey& other) const; - size_t HashCode() const; - private: static constexpr uint64_t HASH_CONSTANT = 0x9e3779b97f4a7c15ULL; @@ -56,13 +56,27 @@ class PositionCacheKey : public CacheKey { const int32_t length_; const bool is_index_; }; -} // namespace paimon -namespace std { -template <> -struct hash { - size_t operator()(const paimon::PositionCacheKey& key) const { - return key.HashCode(); +struct CacheKeyHash { + size_t operator()(const std::shared_ptr& key) const { + return key ? key->HashCode() : 0; } }; -} // namespace std + +struct CacheKeyEqual { + bool operator()(const std::shared_ptr& lhs, + const std::shared_ptr& rhs) const { + if (lhs == rhs) { + return true; + } + if (!lhs || !rhs) { + return false; + } + return lhs->Equals(*rhs); + } +}; + +using CacheKeyMap = std::unordered_map, std::shared_ptr, + CacheKeyHash, CacheKeyEqual>; + +} // namespace paimon diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp b/src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp index 19e26851..b7c6b399 100644 --- a/src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp +++ b/src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp @@ -45,8 +45,7 @@ Status SortLookupStoreWriter::Close() { PAIMON_RETURN_NOT_OK(writer_->Flush()); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr bloom_filter_handle, writer_->WriteBloomFilter()); - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr index_block_handle, - writer_->WriteIndexBlock()); + PAIMON_ASSIGN_OR_RAISE(BlockHandle index_block_handle, writer_->WriteIndexBlock()); PAIMON_RETURN_NOT_OK(writer_->WriteFooter(index_block_handle, bloom_filter_handle)); PAIMON_RETURN_NOT_OK(out_->Close()); return Status::OK(); diff --git a/src/paimon/common/memory/memory_slice.cpp b/src/paimon/common/memory/memory_slice.cpp index 9ef05480..b8d3d0e4 100644 --- a/src/paimon/common/memory/memory_slice.cpp +++ b/src/paimon/common/memory/memory_slice.cpp @@ -19,23 +19,23 @@ #include "paimon/common/memory/memory_slice_input.h" namespace paimon { -std::shared_ptr MemorySlice::Wrap(const std::shared_ptr& bytes) { +MemorySlice MemorySlice::Wrap(const std::shared_ptr& bytes) { auto segment = MemorySegment::Wrap(bytes); - return std::make_shared(segment, 0, segment.Size()); + return MemorySlice(segment, 0, segment.Size()); } -std::shared_ptr MemorySlice::Wrap(const MemorySegment& segment) { - return std::make_shared(segment, 0, segment.Size()); +MemorySlice MemorySlice::Wrap(const MemorySegment& segment) { + return MemorySlice(segment, 0, segment.Size()); } MemorySlice::MemorySlice(const MemorySegment& segment, int32_t offset, int32_t length) : segment_(segment), offset_(offset), length_(length) {} -std::shared_ptr MemorySlice::Slice(int32_t index, int32_t length) { +MemorySlice MemorySlice::Slice(int32_t index, int32_t length) const { if (index == 0 && length == length_) { - return shared_from_this(); + return *this; } - return std::make_shared(segment_, offset_ + index, length); + return MemorySlice(segment_, offset_ + index, length); } int32_t MemorySlice::Length() const { @@ -54,37 +54,36 @@ const MemorySegment& MemorySlice::GetSegment() const { return segment_; } -int8_t MemorySlice::ReadByte(int32_t position) { +int8_t MemorySlice::ReadByte(int32_t position) const { return segment_.GetValue(offset_ + position); } -int32_t MemorySlice::ReadInt(int32_t position) { +int32_t MemorySlice::ReadInt(int32_t position) const { return segment_.GetValue(offset_ + position); } -int16_t MemorySlice::ReadShort(int32_t position) { +int16_t MemorySlice::ReadShort(int32_t position) const { return segment_.GetValue(offset_ + position); } -int64_t MemorySlice::ReadLong(int32_t position) { +int64_t MemorySlice::ReadLong(int32_t position) const { return segment_.GetValue(offset_ + position); } -std::string_view MemorySlice::ReadStringView() { +std::string_view MemorySlice::ReadStringView() const { auto array = segment_.GetArray(); return {array->data() + offset_, static_cast(length_)}; } -std::shared_ptr MemorySlice::CopyBytes(MemoryPool* pool) { +std::shared_ptr MemorySlice::CopyBytes(MemoryPool* pool) const { auto bytes = std::make_shared(length_, pool); auto target = MemorySegment::Wrap(bytes); segment_.CopyTo(offset_, &target, 0, length_); return bytes; } -std::shared_ptr MemorySlice::ToInput() { - auto self = shared_from_this(); - return std::make_shared(self); +MemorySliceInput MemorySlice::ToInput() const { + return MemorySliceInput(*this); } } // namespace paimon diff --git a/src/paimon/common/memory/memory_slice.h b/src/paimon/common/memory/memory_slice.h index d3474ad5..d8bca924 100644 --- a/src/paimon/common/memory/memory_slice.h +++ b/src/paimon/common/memory/memory_slice.h @@ -31,34 +31,31 @@ class MemoryPool; class MemorySliceInput; /// Slice of a MemorySegment. -class PAIMON_EXPORT MemorySlice : public std::enable_shared_from_this { +class PAIMON_EXPORT MemorySlice { public: - static std::shared_ptr Wrap(const std::shared_ptr& bytes); - static std::shared_ptr Wrap(const MemorySegment& segment); + static MemorySlice Wrap(const std::shared_ptr& bytes); + static MemorySlice Wrap(const MemorySegment& segment); - using SliceComparator = std::function(const std::shared_ptr&, - const std::shared_ptr&)>; + using SliceComparator = std::function(const MemorySlice&, const MemorySlice&)>; public: - MemorySlice() = default; - MemorySlice(const MemorySegment& segment, int32_t offset, int32_t length); - std::shared_ptr Slice(int32_t index, int32_t length); + MemorySlice Slice(int32_t index, int32_t length) const; int32_t Length() const; int32_t Offset() const; std::shared_ptr GetHeapMemory() const; const MemorySegment& GetSegment() const; - int8_t ReadByte(int32_t position); - int32_t ReadInt(int32_t position); - int16_t ReadShort(int32_t position); - int64_t ReadLong(int32_t position); - std::string_view ReadStringView(); + int8_t ReadByte(int32_t position) const; + int32_t ReadInt(int32_t position) const; + int16_t ReadShort(int32_t position) const; + int64_t ReadLong(int32_t position) const; + std::string_view ReadStringView() const; - std::shared_ptr CopyBytes(MemoryPool* pool); + std::shared_ptr CopyBytes(MemoryPool* pool) const; - std::shared_ptr ToInput(); + MemorySliceInput ToInput() const; private: MemorySegment segment_; diff --git a/src/paimon/common/memory/memory_slice_input.cpp b/src/paimon/common/memory/memory_slice_input.cpp index f08c195c..e46e0575 100644 --- a/src/paimon/common/memory/memory_slice_input.cpp +++ b/src/paimon/common/memory/memory_slice_input.cpp @@ -20,31 +20,30 @@ namespace paimon { -MemorySliceInput::MemorySliceInput(const std::shared_ptr& slice) - : slice_(slice), position_(0) {} +MemorySliceInput::MemorySliceInput(const MemorySlice& slice) : slice_(slice), position_(0) {} int32_t MemorySliceInput::Position() const { return position_; } Status MemorySliceInput::SetPosition(int32_t position) { - if (position < 0 || position > slice_->Length()) { + if (position < 0 || position > slice_.Length()) { return Status::IndexError("position " + std::to_string(position) + " index out of bounds"); } position_ = position; return Status::OK(); } -bool MemorySliceInput::IsReadable() { +bool MemorySliceInput::IsReadable() const { return Available() > 0; } -int32_t MemorySliceInput::Available() { - return slice_->Length() - position_; +int32_t MemorySliceInput::Available() const { + return slice_.Length() - position_; } int8_t MemorySliceInput::ReadByte() { - return slice_->ReadByte(position_++); + return slice_.ReadByte(position_++); } int8_t MemorySliceInput::ReadUnsignedByte() { @@ -52,7 +51,7 @@ int8_t MemorySliceInput::ReadUnsignedByte() { } int32_t MemorySliceInput::ReadInt() { - int v = slice_->ReadInt(position_); + int32_t v = slice_.ReadInt(position_); position_ += 4; if (NeedSwap()) { return EndianSwapValue(v); @@ -61,7 +60,7 @@ int32_t MemorySliceInput::ReadInt() { } int64_t MemorySliceInput::ReadLong() { - int64_t v = slice_->ReadLong(position_); + int64_t v = slice_.ReadLong(position_); position_ += 8; if (NeedSwap()) { return EndianSwapValue(v); @@ -100,8 +99,8 @@ bool MemorySliceInput::NeedSwap() const { return SystemByteOrder() != byte_order_; } -std::shared_ptr MemorySliceInput::ReadSlice(int length) { - auto slice = slice_->Slice(position_, length); +MemorySlice MemorySliceInput::ReadSlice(int32_t length) { + auto slice = slice_.Slice(position_, length); position_ += length; return slice; } diff --git a/src/paimon/common/memory/memory_slice_input.h b/src/paimon/common/memory/memory_slice_input.h index 7988f17f..f85de0da 100644 --- a/src/paimon/common/memory/memory_slice_input.h +++ b/src/paimon/common/memory/memory_slice_input.h @@ -33,15 +33,13 @@ class MemoryPool; /// Slice of a MemorySegment. class PAIMON_EXPORT MemorySliceInput { public: - MemorySliceInput() = default; - - explicit MemorySliceInput(const std::shared_ptr& slice); + explicit MemorySliceInput(const MemorySlice& slice); int32_t Position() const; Status SetPosition(int32_t position); - bool IsReadable(); - int32_t Available(); + bool IsReadable() const; + int32_t Available() const; int8_t ReadByte(); int8_t ReadUnsignedByte(); @@ -49,7 +47,7 @@ class PAIMON_EXPORT MemorySliceInput { int64_t ReadLong(); Result ReadVarLenInt(); Result ReadVarLenLong(); - std::shared_ptr ReadSlice(int length); + MemorySlice ReadSlice(int32_t length); void SetOrder(ByteOrder order); @@ -57,7 +55,7 @@ class PAIMON_EXPORT MemorySliceInput { bool NeedSwap() const; private: - std::shared_ptr slice_; + MemorySlice slice_; int32_t position_; ByteOrder byte_order_ = SystemByteOrder(); diff --git a/src/paimon/common/memory/memory_slice_output.cpp b/src/paimon/common/memory/memory_slice_output.cpp index 8588ee6a..6c53ce58 100644 --- a/src/paimon/common/memory/memory_slice_output.cpp +++ b/src/paimon/common/memory/memory_slice_output.cpp @@ -33,8 +33,8 @@ void MemorySliceOutput::Reset() { size_ = 0; } -std::unique_ptr MemorySliceOutput::ToSlice() { - return std::make_unique(segment_, 0, size_); +MemorySlice MemorySliceOutput::ToSlice() { + return MemorySlice(segment_, 0, size_); } template diff --git a/src/paimon/common/memory/memory_slice_output.h b/src/paimon/common/memory/memory_slice_output.h index 17e1b269..4941e56a 100644 --- a/src/paimon/common/memory/memory_slice_output.h +++ b/src/paimon/common/memory/memory_slice_output.h @@ -39,7 +39,7 @@ class PAIMON_EXPORT MemorySliceOutput { int32_t Size() const; void Reset(); - std::unique_ptr ToSlice(); + MemorySlice ToSlice(); template void WriteValue(T value); diff --git a/src/paimon/common/sst/block_cache.h b/src/paimon/common/sst/block_cache.h index 3462039b..79a8924e 100644 --- a/src/paimon/common/sst/block_cache.h +++ b/src/paimon/common/sst/block_cache.h @@ -24,7 +24,6 @@ #include "paimon/fs/file_system.h" #include "paimon/reader/batch_reader.h" #include "paimon/result.h" - namespace paimon { class BlockCache { @@ -36,17 +35,24 @@ class BlockCache { ~BlockCache() = default; - Result GetBlock(int64_t position, int32_t length, bool is_index) { + Result GetBlock( + int64_t position, int32_t length, bool is_index, + std::function(const MemorySegment&)> decompress_func) { auto key = CacheKey::ForPosition(file_path_, position, length, is_index); - auto it = blocks_.find(key); if (it == blocks_.end()) { PAIMON_ASSIGN_OR_RAISE( MemorySegment segment, cache_manager_->GetPage( key, [&](const std::shared_ptr&) -> Result { - return ReadFrom(position, length); + PAIMON_ASSIGN_OR_RAISE(MemorySegment compress_data, + ReadFrom(position, length)); + if (!decompress_func) { + return compress_data; + } + return decompress_func(compress_data); })); + blocks_.insert({key, std::make_shared(segment)}); return segment; } @@ -74,6 +80,6 @@ class BlockCache { std::shared_ptr pool_; std::unique_ptr cache_manager_; - std::unordered_map, std::shared_ptr> blocks_; + CacheKeyMap blocks_; }; } // namespace paimon diff --git a/src/paimon/common/sst/block_entry.h b/src/paimon/common/sst/block_entry.h index cc833e68..b96f49a0 100644 --- a/src/paimon/common/sst/block_entry.h +++ b/src/paimon/common/sst/block_entry.h @@ -24,10 +24,9 @@ namespace paimon { struct BlockEntry { - BlockEntry(const std::shared_ptr& _key, const std::shared_ptr& _value) - : key(_key), value(_value) {} + BlockEntry(const MemorySlice& _key, const MemorySlice& _value) : key(_key), value(_value) {} - std::shared_ptr key; - std::shared_ptr value; + MemorySlice key; + MemorySlice value; }; } // namespace paimon diff --git a/src/paimon/common/sst/block_footer.cpp b/src/paimon/common/sst/block_footer.cpp index 5951eaff..1f74ed5b 100644 --- a/src/paimon/common/sst/block_footer.cpp +++ b/src/paimon/common/sst/block_footer.cpp @@ -20,8 +20,7 @@ namespace paimon { -Result> BlockFooter::ReadBlockFooter( - std::shared_ptr& input) { +Result> BlockFooter::ReadBlockFooter(MemorySliceInput* input) { auto offset = input->ReadLong(); auto size = input->ReadInt(); auto expected_entries = input->ReadLong(); @@ -31,7 +30,7 @@ Result> BlockFooter::ReadBlockFooter( } auto index_offset = input->ReadLong(); auto index_size = input->ReadInt(); - auto index_block_handle = std::make_shared(index_offset, index_size); + BlockHandle index_block_handle(index_offset, index_size); auto magic = input->ReadInt(); if (magic != MAGIC_NUMBER) { @@ -41,23 +40,23 @@ Result> BlockFooter::ReadBlockFooter( return std::make_unique(index_block_handle, bloom_filter_handle); } -std::shared_ptr BlockFooter::WriteBlockFooter(MemoryPool* pool) { - auto output = std::make_shared(ENCODED_LENGTH, pool); +MemorySlice BlockFooter::WriteBlockFooter(MemoryPool* pool) { + MemorySliceOutput output(ENCODED_LENGTH, pool); // 20 bytes if (!bloom_filter_handle_.get()) { - output->WriteValue(static_cast(0)); - output->WriteValue(static_cast(0)); - output->WriteValue(static_cast(0)); + output.WriteValue(static_cast(0)); + output.WriteValue(static_cast(0)); + output.WriteValue(static_cast(0)); } else { - output->WriteValue(bloom_filter_handle_->Offset()); - output->WriteValue(bloom_filter_handle_->Size()); - output->WriteValue(bloom_filter_handle_->ExpectedEntries()); + output.WriteValue(bloom_filter_handle_->Offset()); + output.WriteValue(bloom_filter_handle_->Size()); + output.WriteValue(bloom_filter_handle_->ExpectedEntries()); } // 12 bytes - output->WriteValue(index_block_handle_->Offset()); - output->WriteValue(index_block_handle_->Size()); + output.WriteValue(index_block_handle_.Offset()); + output.WriteValue(index_block_handle_.Size()); // 4 bytes - output->WriteValue(MAGIC_NUMBER); - return output->ToSlice(); + output.WriteValue(MAGIC_NUMBER); + return output.ToSlice(); } } // namespace paimon diff --git a/src/paimon/common/sst/block_footer.h b/src/paimon/common/sst/block_footer.h index b69ef0dd..96a7033e 100644 --- a/src/paimon/common/sst/block_footer.h +++ b/src/paimon/common/sst/block_footer.h @@ -30,24 +30,23 @@ namespace paimon { /// Footer of a block. class BlockFooter { public: - static Result> ReadBlockFooter( - std::shared_ptr& input); + static Result> ReadBlockFooter(MemorySliceInput* input); public: - BlockFooter(const std::shared_ptr& index_block_handle, + BlockFooter(const BlockHandle& index_block_handle, const std::shared_ptr& bloom_filter_handle) : index_block_handle_(index_block_handle), bloom_filter_handle_(bloom_filter_handle) {} ~BlockFooter() = default; - std::shared_ptr GetIndexBlockHandle() const { + const BlockHandle& GetIndexBlockHandle() const { return index_block_handle_; } std::shared_ptr GetBloomFilterHandle() const { return bloom_filter_handle_; } - std::shared_ptr WriteBlockFooter(MemoryPool* pool); + MemorySlice WriteBlockFooter(MemoryPool* pool); public: // 20 bytes for bloom filter handle, 12 bytes for index block handle, 4 bytes for magic number @@ -55,7 +54,7 @@ class BlockFooter { static constexpr int32_t MAGIC_NUMBER = 1481571681; private: - std::shared_ptr index_block_handle_; + BlockHandle index_block_handle_; std::shared_ptr bloom_filter_handle_; }; } // namespace paimon diff --git a/src/paimon/common/sst/block_handle.cpp b/src/paimon/common/sst/block_handle.cpp index 03f5c959..6c0239ff 100644 --- a/src/paimon/common/sst/block_handle.cpp +++ b/src/paimon/common/sst/block_handle.cpp @@ -20,11 +20,10 @@ namespace paimon { -Result> BlockHandle::ReadBlockHandle( - const std::shared_ptr& input) { +Result BlockHandle::ReadBlockHandle(MemorySliceInput* input) { PAIMON_ASSIGN_OR_RAISE(int64_t offset, input->ReadVarLenLong()); PAIMON_ASSIGN_OR_RAISE(int32_t size, input->ReadVarLenInt()); - return std::make_shared(offset, size); + return BlockHandle(offset, size); } BlockHandle::BlockHandle(int64_t offset, int32_t size) : offset_(offset), size_(size) {} @@ -46,10 +45,10 @@ std::string BlockHandle::ToString() const { "}"; } -std::shared_ptr BlockHandle::WriteBlockHandle(MemoryPool* pool) { - auto output = std::make_shared(MAX_ENCODED_LENGTH, pool); - output->WriteVarLenLong(offset_); - output->WriteVarLenInt(size_); - return output->ToSlice(); +MemorySlice BlockHandle::WriteBlockHandle(MemoryPool* pool) { + MemorySliceOutput output(MAX_ENCODED_LENGTH, pool); + output.WriteVarLenLong(offset_); + output.WriteVarLenInt(size_); + return output.ToSlice(); } } // namespace paimon diff --git a/src/paimon/common/sst/block_handle.h b/src/paimon/common/sst/block_handle.h index 76e9fd94..4c3f9419 100644 --- a/src/paimon/common/sst/block_handle.h +++ b/src/paimon/common/sst/block_handle.h @@ -27,8 +27,7 @@ namespace paimon { class BlockHandle { public: - static Result> ReadBlockHandle( - const std::shared_ptr& input); + static Result ReadBlockHandle(MemorySliceInput* input); public: BlockHandle(int64_t offset, int32_t size); @@ -39,7 +38,7 @@ class BlockHandle { int32_t GetFullBlockSize() const; std::string ToString() const; - std::shared_ptr WriteBlockHandle(MemoryPool* pool); + MemorySlice WriteBlockHandle(MemoryPool* pool); public: // max len for varlong is 9 bytes, max len for varint is 5 bytes diff --git a/src/paimon/common/sst/block_iterator.cpp b/src/paimon/common/sst/block_iterator.cpp index 80d127e4..221185b0 100644 --- a/src/paimon/common/sst/block_iterator.cpp +++ b/src/paimon/common/sst/block_iterator.cpp @@ -19,51 +19,62 @@ #include "paimon/common/sst/block_reader.h" namespace paimon { -BlockIterator::BlockIterator(const std::shared_ptr& reader) : reader_(reader) { - input_ = reader->BlockInput(); -} +BlockIterator::BlockIterator(const std::shared_ptr& reader) + : input_(reader->BlockInput()), reader_(reader) {} bool BlockIterator::HasNext() const { - return polled_.get() || input_->IsReadable(); + return polled_position_ >= 0 || input_.IsReadable(); } Result> BlockIterator::Next() { if (!HasNext()) { return Status::Invalid("no such element"); } - if (polled_.get()) { - return std::move(polled_); + if (polled_position_ >= 0) { + PAIMON_RETURN_NOT_OK(input_.SetPosition(polled_position_)); + polled_position_ = -1; + return ReadEntry(); } return ReadEntry(); } Result> BlockIterator::ReadEntry() { - PAIMON_ASSIGN_OR_RAISE(int32_t key_length, input_->ReadVarLenInt()); - auto key = input_->ReadSlice(key_length); - PAIMON_ASSIGN_OR_RAISE(int32_t value_length, input_->ReadVarLenInt()); - auto value = input_->ReadSlice(value_length); + PAIMON_ASSIGN_OR_RAISE(int32_t key_length, input_.ReadVarLenInt()); + auto key = input_.ReadSlice(key_length); + PAIMON_ASSIGN_OR_RAISE(int32_t value_length, input_.ReadVarLenInt()); + auto value = input_.ReadSlice(value_length); return std::make_unique(key, value); } -Result BlockIterator::SeekTo(const std::shared_ptr& target_key) { +Result BlockIterator::ReadKeyAndSkipValue() { + PAIMON_ASSIGN_OR_RAISE(int32_t key_length, input_.ReadVarLenInt()); + auto key = input_.ReadSlice(key_length); + PAIMON_ASSIGN_OR_RAISE(int32_t value_length, input_.ReadVarLenInt()); + PAIMON_RETURN_NOT_OK(input_.SetPosition(input_.Position() + value_length)); + return key; +} + +Result BlockIterator::SeekTo(const MemorySlice& target_key) { int32_t left = 0; int32_t right = reader_->RecordCount() - 1; + polled_position_ = -1; while (left <= right) { int32_t mid = left + (right - left) / 2; - PAIMON_RETURN_NOT_OK(input_->SetPosition(reader_->SeekTo(mid))); - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr mid_entry, ReadEntry()); - PAIMON_ASSIGN_OR_RAISE(int32_t compare, reader_->Comparator()(mid_entry->key, target_key)); + int32_t entry_position = reader_->SeekTo(mid); + PAIMON_RETURN_NOT_OK(input_.SetPosition(entry_position)); + PAIMON_ASSIGN_OR_RAISE(MemorySlice mid_key, ReadKeyAndSkipValue()); + PAIMON_ASSIGN_OR_RAISE(int32_t compare, reader_->Comparator()(mid_key, target_key)); if (compare == 0) { - polled_ = std::move(mid_entry); + polled_position_ = entry_position; return true; } else if (compare > 0) { - polled_ = std::move(mid_entry); + polled_position_ = entry_position; right = mid - 1; } else { - polled_.reset(); + polled_position_ = -1; left = mid + 1; } } diff --git a/src/paimon/common/sst/block_iterator.h b/src/paimon/common/sst/block_iterator.h index 24455df7..506cf1c7 100644 --- a/src/paimon/common/sst/block_iterator.h +++ b/src/paimon/common/sst/block_iterator.h @@ -32,11 +32,17 @@ class BlockIterator { Result> ReadEntry(); - Result SeekTo(const std::shared_ptr& target_key); + Result SeekTo(const MemorySlice& target_key); private: - std::shared_ptr input_; - std::unique_ptr polled_; + /// Read only the key MemorySlice from the current position, skipping the value. + /// This avoids creating a value MemorySlice and BlockEntry during binary search. + Result ReadKeyAndSkipValue(); + + MemorySliceInput input_; + /// Position of the entry that should be returned by Next() after SeekTo. + /// -1 means no pending entry. + int32_t polled_position_ = -1; std::shared_ptr reader_; }; diff --git a/src/paimon/common/sst/block_reader.cpp b/src/paimon/common/sst/block_reader.cpp index 9eeefd52..8b6d3ceb 100644 --- a/src/paimon/common/sst/block_reader.cpp +++ b/src/paimon/common/sst/block_reader.cpp @@ -19,19 +19,19 @@ #include "paimon/common/sst/block_trailer.h" namespace paimon { -Result> BlockReader::Create(const std::shared_ptr& block, +Result> BlockReader::Create(const MemorySlice& block, MemorySlice::SliceComparator comparator) { - PAIMON_ASSIGN_OR_RAISE(BlockAlignedType type, From(block->ReadByte(block->Length() - 1))); + PAIMON_ASSIGN_OR_RAISE(BlockAlignedType type, From(block.ReadByte(block.Length() - 1))); const auto trailer_len = BlockTrailer::ENCODED_LENGTH; - int32_t size = block->ReadInt(block->Length() - trailer_len); + int32_t size = block.ReadInt(block.Length() - trailer_len); if (type == BlockAlignedType::ALIGNED) { - auto data = block->Slice(0, block->Length() - trailer_len); + auto data = block.Slice(0, block.Length() - trailer_len); return std::make_shared(data, size, std::move(comparator)); } else { int32_t index_length = size * 4; - int32_t index_offset = block->Length() - trailer_len - index_length; - auto data = block->Slice(0, index_offset); - auto index = block->Slice(index_offset, index_length); + int32_t index_offset = block.Length() - trailer_len - index_length; + auto data = block.Slice(0, index_offset); + auto index = block.Slice(index_offset, index_length); return std::make_shared(data, index, std::move(comparator)); } } @@ -41,15 +41,15 @@ std::unique_ptr BlockReader::Iterator() { return std::make_unique(ptr); } -std::shared_ptr BlockReader::BlockInput() { - return block_->ToInput(); +MemorySliceInput BlockReader::BlockInput() { + return block_.ToInput(); } int32_t BlockReader::RecordCount() const { return record_count_; } -MemorySlice::SliceComparator BlockReader::Comparator() const { +const MemorySlice::SliceComparator& BlockReader::Comparator() const { return comparator_; } diff --git a/src/paimon/common/sst/block_reader.h b/src/paimon/common/sst/block_reader.h index 542c5c24..37773807 100644 --- a/src/paimon/common/sst/block_reader.h +++ b/src/paimon/common/sst/block_reader.h @@ -34,33 +34,33 @@ class BlockReader : public std::enable_shared_from_this { public: virtual ~BlockReader() = default; - static Result> Create(const std::shared_ptr& block, + static Result> Create(const MemorySlice& block, MemorySlice::SliceComparator comparator); virtual int32_t SeekTo(int32_t record_position) = 0; int32_t RecordCount() const; - MemorySlice::SliceComparator Comparator() const; + const MemorySlice::SliceComparator& Comparator() const; std::unique_ptr Iterator(); - std::shared_ptr BlockInput(); + MemorySliceInput BlockInput(); protected: - BlockReader(const std::shared_ptr& block, int32_t record_count, + BlockReader(const MemorySlice& block, int32_t record_count, MemorySlice::SliceComparator comparator) : block_(block), comparator_(std::move(comparator)), record_count_(record_count) {} private: - std::shared_ptr block_; + MemorySlice block_; MemorySlice::SliceComparator comparator_; int32_t record_count_; }; class AlignedBlockReader : public BlockReader { public: - AlignedBlockReader(const std::shared_ptr& block, int32_t record_size, + AlignedBlockReader(const MemorySlice& block, int32_t record_size, MemorySlice::SliceComparator comparator) - : BlockReader(block, block->Length() / record_size, std::move(comparator)), + : BlockReader(block, block.Length() / record_size, std::move(comparator)), record_size_(record_size) {} int32_t SeekTo(int32_t record_position) override { @@ -73,17 +73,16 @@ class AlignedBlockReader : public BlockReader { class UnAlignedBlockReader : public BlockReader { public: - UnAlignedBlockReader(const std::shared_ptr& data, - const std::shared_ptr& index, + UnAlignedBlockReader(const MemorySlice& data, const MemorySlice& index, MemorySlice::SliceComparator comparator) - : BlockReader(data, index->Length() / 4, std::move(comparator)), index_(index) {} + : BlockReader(data, index.Length() / 4, std::move(comparator)), index_(index) {} int32_t SeekTo(int32_t record_position) override { - return index_->ReadInt(record_position * 4); + return index_.ReadInt(record_position * 4); } private: - std::shared_ptr index_; + MemorySlice index_; }; } // namespace paimon diff --git a/src/paimon/common/sst/block_trailer.cpp b/src/paimon/common/sst/block_trailer.cpp index abc08609..c1f54585 100644 --- a/src/paimon/common/sst/block_trailer.cpp +++ b/src/paimon/common/sst/block_trailer.cpp @@ -20,8 +20,7 @@ namespace paimon { -std::unique_ptr BlockTrailer::ReadBlockTrailer( - std::shared_ptr& input) { +std::unique_ptr BlockTrailer::ReadBlockTrailer(MemorySliceInput* input) { auto compress = input->ReadUnsignedByte(); auto crc32c = input->ReadInt(); return std::make_unique(compress, crc32c); @@ -42,10 +41,10 @@ std::string BlockTrailer::ToString() const { sstream.str() + "}"; } -std::shared_ptr BlockTrailer::WriteBlockTrailer(MemoryPool* pool) { - auto output = std::make_shared(ENCODED_LENGTH, pool); - output->WriteValue(compression_type_); - output->WriteValue(crc32c_); - return output->ToSlice(); +MemorySlice BlockTrailer::WriteBlockTrailer(MemoryPool* pool) { + MemorySliceOutput output(ENCODED_LENGTH, pool); + output.WriteValue(compression_type_); + output.WriteValue(crc32c_); + return output.ToSlice(); } } // namespace paimon diff --git a/src/paimon/common/sst/block_trailer.h b/src/paimon/common/sst/block_trailer.h index 0444fbad..9859db15 100644 --- a/src/paimon/common/sst/block_trailer.h +++ b/src/paimon/common/sst/block_trailer.h @@ -28,7 +28,7 @@ namespace paimon { /// Trailer of a block. class BlockTrailer { public: - static std::unique_ptr ReadBlockTrailer(std::shared_ptr& input); + static std::unique_ptr ReadBlockTrailer(MemorySliceInput* input); public: BlockTrailer(int8_t compression_type, int32_t crc32c) @@ -40,7 +40,7 @@ class BlockTrailer { int8_t CompressionType() const; std::string ToString() const; - std::shared_ptr WriteBlockTrailer(MemoryPool* pool); + MemorySlice WriteBlockTrailer(MemoryPool* pool); public: static constexpr int32_t ENCODED_LENGTH = 5; diff --git a/src/paimon/common/sst/block_writer.cpp b/src/paimon/common/sst/block_writer.cpp index b98cc9e0..911d65aa 100644 --- a/src/paimon/common/sst/block_writer.cpp +++ b/src/paimon/common/sst/block_writer.cpp @@ -45,7 +45,7 @@ void BlockWriter::Reset() { aligned_ = true; } -Result> BlockWriter::Finish() { +Result BlockWriter::Finish() { if (positions_.size() == 0) { // Do not use alignment mode, as it is impossible to calculate how many records are // inside when reading diff --git a/src/paimon/common/sst/block_writer.h b/src/paimon/common/sst/block_writer.h index ce76d944..06086b16 100644 --- a/src/paimon/common/sst/block_writer.h +++ b/src/paimon/common/sst/block_writer.h @@ -76,7 +76,7 @@ class BlockWriter { return memory; } - Result> Finish(); + Result Finish(); private: const int32_t size_; diff --git a/src/paimon/common/sst/sst_file_io_test.cpp b/src/paimon/common/sst/sst_file_io_test.cpp index 85320619..7bed0740 100644 --- a/src/paimon/common/sst/sst_file_io_test.cpp +++ b/src/paimon/common/sst/sst_file_io_test.cpp @@ -55,10 +55,9 @@ class SstFileIOTest : public ::testing::TestWithParam { dir_ = paimon::test::UniqueTestDirectory::Create(); fs_ = dir_->GetFileSystem(); pool_ = GetDefaultPool(); - comparator_ = [](const std::shared_ptr& a, - const std::shared_ptr& b) -> Result { - std::string_view va = a->ReadStringView(); - std::string_view vb = b->ReadStringView(); + comparator_ = [](const MemorySlice& a, const MemorySlice& b) -> Result { + std::string_view va = a.ReadStringView(); + std::string_view vb = b.ReadStringView(); if (va == vb) { return 0; } diff --git a/src/paimon/common/sst/sst_file_reader.cpp b/src/paimon/common/sst/sst_file_reader.cpp index 4ba31335..5b5238b3 100644 --- a/src/paimon/common/sst/sst_file_reader.cpp +++ b/src/paimon/common/sst/sst_file_reader.cpp @@ -15,10 +15,10 @@ */ #include "paimon/common/sst/sst_file_reader.h" +#include "fmt/format.h" #include "paimon/common/sst/sst_file_utils.h" #include "paimon/common/utils/crc32c.h" #include "paimon/common/utils/murmurhash_utils.h" - namespace paimon { Result> SstFileReader::Create( @@ -30,13 +30,14 @@ Result> SstFileReader::Create( std::make_shared(file_path, in, pool, std::make_unique()); // read footer - PAIMON_ASSIGN_OR_RAISE(MemorySegment segment, - block_cache->GetBlock(file_len - BlockFooter::ENCODED_LENGTH, - BlockFooter::ENCODED_LENGTH, true)); + PAIMON_ASSIGN_OR_RAISE( + MemorySegment segment, + block_cache->GetBlock(file_len - BlockFooter::ENCODED_LENGTH, BlockFooter::ENCODED_LENGTH, + /*is_index=*/true, /*decompress_func=*/nullptr)); auto slice = MemorySlice::Wrap(segment); - auto input = slice->ToInput(); + auto input = slice.ToInput(); PAIMON_ASSIGN_OR_RAISE(std::unique_ptr footer, - BlockFooter::ReadBlockFooter(input)); + BlockFooter::ReadBlockFooter(&input)); // read bloom filter directly now auto bloom_filter_handle = footer->GetBloomFilterHandle(); @@ -45,9 +46,10 @@ Result> SstFileReader::Create( bloom_filter_handle->Size() || bloom_filter_handle->Offset())) { bloom_filter = std::make_shared(bloom_filter_handle->ExpectedEntries(), bloom_filter_handle->Size()); - PAIMON_ASSIGN_OR_RAISE(MemorySegment bloom_filter_data, - block_cache->GetBlock(bloom_filter_handle->Offset(), - bloom_filter_handle->Size(), true)); + PAIMON_ASSIGN_OR_RAISE( + MemorySegment bloom_filter_data, + block_cache->GetBlock(bloom_filter_handle->Offset(), bloom_filter_handle->Size(), + /*is_index=*/true, /*decompress_func=*/nullptr)); PAIMON_RETURN_NOT_OK(bloom_filter->SetMemorySegment(bloom_filter_data)); } @@ -55,17 +57,20 @@ Result> SstFileReader::Create( auto index_block_handle = footer->GetIndexBlockHandle(); PAIMON_ASSIGN_OR_RAISE( MemorySegment trailer_data, - block_cache->GetBlock(index_block_handle->Offset() + index_block_handle->Size(), - BlockTrailer::ENCODED_LENGTH, true)); - auto trailer_input = MemorySlice::Wrap(trailer_data)->ToInput(); - auto trailer = BlockTrailer::ReadBlockTrailer(trailer_input); + block_cache->GetBlock(index_block_handle.Offset() + index_block_handle.Size(), + BlockTrailer::ENCODED_LENGTH, /*is_index=*/true, + /*decompress_func=*/nullptr)); + auto trailer_slice = MemorySlice::Wrap(trailer_data); + auto trailer_input = trailer_slice.ToInput(); + std::shared_ptr trailer = BlockTrailer::ReadBlockTrailer(&trailer_input); PAIMON_ASSIGN_OR_RAISE( MemorySegment block_data, - block_cache->GetBlock(index_block_handle->Offset(), index_block_handle->Size(), true)); - PAIMON_ASSIGN_OR_RAISE(MemorySegment uncompressed_data, - DecompressBlock(block_data, trailer, pool)); + block_cache->GetBlock(index_block_handle.Offset(), index_block_handle.Size(), true, + [pool, trailer](const MemorySegment& seg) -> Result { + return DecompressBlock(seg, trailer, pool); + })); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr reader, - BlockReader::Create(MemorySlice::Wrap(uncompressed_data), comparator)); + BlockReader::Create(MemorySlice::Wrap(block_data), comparator)); return std::shared_ptr( new SstFileReader(pool, block_cache, bloom_filter, reader, comparator)); } @@ -101,7 +106,7 @@ Result> SstFileReader::Lookup(const std::shared_ptrSeekTo(key_slice)); if (success) { PAIMON_ASSIGN_OR_RAISE(std::unique_ptr ret, current->Next()); - return ret->value->CopyBytes(pool_.get()); + return ret->value.CopyBytes(pool_.get()); } } return std::shared_ptr(); @@ -109,38 +114,33 @@ Result> SstFileReader::Lookup(const std::shared_ptr> SstFileReader::GetNextBlock( std::unique_ptr& index_iterator) { - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr ret, index_iterator->Next()); - auto& slice = ret->value; - auto input = slice->ToInput(); - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr block_handle, - BlockHandle::ReadBlockHandle(input)); - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr reader, - ReadBlock(std::move(block_handle), false)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr block_entry, index_iterator->Next()); + auto block_input = block_entry->value.ToInput(); + PAIMON_ASSIGN_OR_RAISE(BlockHandle block_handle, BlockHandle::ReadBlockHandle(&block_input)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr reader, ReadBlock(block_handle, false)); return reader->Iterator(); } -Result> SstFileReader::ReadBlock(std::shared_ptr&& handle, +Result> SstFileReader::ReadBlock(const BlockHandle& handle, bool index) { - auto block_handle = handle; - return ReadBlock(block_handle, index); -} - -Result> SstFileReader::ReadBlock( - const std::shared_ptr& handle, bool index) { - PAIMON_ASSIGN_OR_RAISE(MemorySegment trailer_data, - block_cache_->GetBlock(handle->Offset() + handle->Size(), - BlockTrailer::ENCODED_LENGTH, true)); - auto trailer_input = MemorySlice::Wrap(trailer_data)->ToInput(); - auto trailer = BlockTrailer::ReadBlockTrailer(trailer_input); - PAIMON_ASSIGN_OR_RAISE(MemorySegment block_data, - block_cache_->GetBlock(handle->Offset(), handle->Size(), index)); - PAIMON_ASSIGN_OR_RAISE(MemorySegment uncompressed_data, - DecompressBlock(block_data, trailer, pool_)); - return BlockReader::Create(MemorySlice::Wrap(uncompressed_data), comparator_); + PAIMON_ASSIGN_OR_RAISE( + MemorySegment trailer_data, + block_cache_->GetBlock(handle.Offset() + handle.Size(), BlockTrailer::ENCODED_LENGTH, + /*is_index=*/true, /*decompress_func=*/nullptr)); + auto trailer_slice = MemorySlice::Wrap(trailer_data); + auto trailer_input = trailer_slice.ToInput(); + std::shared_ptr trailer = BlockTrailer::ReadBlockTrailer(&trailer_input); + PAIMON_ASSIGN_OR_RAISE( + MemorySegment block_data, + block_cache_->GetBlock(handle.Offset(), handle.Size(), index, + [this, trailer](const MemorySegment& seg) -> Result { + return DecompressBlock(seg, trailer, pool_); + })); + return BlockReader::Create(MemorySlice::Wrap(block_data), comparator_); } Result SstFileReader::DecompressBlock(const MemorySegment& compressed_data, - const std::unique_ptr& trailer, + const std::shared_ptr& trailer, const std::shared_ptr& pool) { auto input_memory = compressed_data.GetHeapMemory(); @@ -164,16 +164,19 @@ Result SstFileReader::DecompressBlock(const MemorySegment& compre return compressed_data; } else { auto decompressor = factory->GetDecompressor(); - auto input = MemorySlice::Wrap(compressed_data)->ToInput(); - PAIMON_ASSIGN_OR_RAISE(int32_t uncompressed_size, input->ReadVarLenInt()); + auto slice = MemorySlice::Wrap(compressed_data); + auto input = slice.ToInput(); + PAIMON_ASSIGN_OR_RAISE(int32_t uncompressed_size, input.ReadVarLenInt()); auto output = MemorySegment::AllocateHeapMemory(uncompressed_size, pool.get()); auto output_memory = output.GetHeapMemory(); PAIMON_ASSIGN_OR_RAISE( - int32_t uncompressed_length, - decompressor->Decompress(input_memory->data() + input->Position(), input->Available(), + int32_t actual_uncompressed_size, + decompressor->Decompress(input_memory->data() + input.Position(), input.Available(), output_memory->data(), output_memory->size())); - if (static_cast(uncompressed_length) != output_memory->size()) { - return Status::Invalid("Invalid data"); + if (static_cast(actual_uncompressed_size) != output_memory->size()) { + return Status::Invalid(fmt::format( + "Invalid data: expect uncompressed size {}, actual uncompressed size {}", + output_memory->size(), actual_uncompressed_size)); } return output; } diff --git a/src/paimon/common/sst/sst_file_reader.h b/src/paimon/common/sst/sst_file_reader.h index d92df3c0..89545b24 100644 --- a/src/paimon/common/sst/sst_file_reader.h +++ b/src/paimon/common/sst/sst_file_reader.h @@ -58,26 +58,19 @@ class SstFileReader { /// @param handle The block handle. /// @param index Whether read the block as an index. /// @return The reader of the target block. - Result> ReadBlock(std::shared_ptr&& handle, - bool index); - - /// @param handle The block handle. - /// @param index Whether read the block as an index. - /// @return The reader of the target block. - Result> ReadBlock(const std::shared_ptr& handle, - bool index); + Result> ReadBlock(const BlockHandle& handle, bool index); Status Close(); private: static Result DecompressBlock(const MemorySegment& compressed_data, - const std::unique_ptr& trailer, + const std::shared_ptr& trailer, const std::shared_ptr& pool); SstFileReader(const std::shared_ptr& pool, const std::shared_ptr& block_cache, const std::shared_ptr& bloom_filter, - const std::shared_ptr& index_block_reader, + const std::shared_ptr& index_block_reader, MemorySlice::SliceComparator comparator); private: diff --git a/src/paimon/common/sst/sst_file_writer.cpp b/src/paimon/common/sst/sst_file_writer.cpp index 786678d1..676cb866 100644 --- a/src/paimon/common/sst/sst_file_writer.cpp +++ b/src/paimon/common/sst/sst_file_writer.cpp @@ -46,8 +46,8 @@ Status SstFileWriter::Write(std::shared_ptr&& key, std::shared_ptr return Status::OK(); } -Status SstFileWriter::Write(std::shared_ptr& slice) { - auto data = slice->ReadStringView(); +Status SstFileWriter::Write(const MemorySlice& slice) { + auto data = slice.ReadStringView(); return WriteBytes(data.data(), data.size()); } @@ -56,24 +56,24 @@ Status SstFileWriter::Flush() { return Status::OK(); } - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr handle, - FlushBlockWriter(data_block_writer_)); + PAIMON_ASSIGN_OR_RAISE(BlockHandle handle, FlushBlockWriter(data_block_writer_.get())); - auto slice = handle->WriteBlockHandle(pool_.get()); - auto value = slice->CopyBytes(pool_.get()); + auto slice = handle.WriteBlockHandle(pool_.get()); + auto value = slice.CopyBytes(pool_.get()); index_block_writer_->Write(last_key_, value); return Status::OK(); } -Result> SstFileWriter::WriteIndexBlock() { - return FlushBlockWriter(index_block_writer_); +Result SstFileWriter::WriteIndexBlock() { + return FlushBlockWriter(index_block_writer_.get()); } Result> SstFileWriter::WriteBloomFilter() { if (!bloom_filter_) { return std::shared_ptr(); } - auto data = bloom_filter_->GetBitSet()->ToSlice()->ReadStringView(); + auto bf_slice = bloom_filter_->GetBitSet()->ToSlice(); + auto data = bf_slice.ReadStringView(); PAIMON_ASSIGN_OR_RAISE(int64_t bloom_filter_pos, out_->GetPos()); auto handle = std::make_shared(bloom_filter_pos, data.size(), bloom_filter_->ExpectedEntries()); @@ -83,20 +83,19 @@ Result> SstFileWriter::WriteBloomFilter() { return handle; } -Status SstFileWriter::WriteFooter(const std::shared_ptr& index_block_handle, +Status SstFileWriter::WriteFooter(const BlockHandle& index_block_handle, const std::shared_ptr& bloom_filter_handle) { - auto footer = std::make_shared(index_block_handle, bloom_filter_handle); - auto slice = footer->WriteBlockFooter(pool_.get()); - auto data = slice->ReadStringView(); + BlockFooter footer(index_block_handle, bloom_filter_handle); + auto slice = footer.WriteBlockFooter(pool_.get()); + auto data = slice.ReadStringView(); PAIMON_RETURN_NOT_OK(WriteBytes(data.data(), data.size())); return Status::OK(); } -Result> SstFileWriter::FlushBlockWriter( - std::unique_ptr& writer) { - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr memory_slice, writer->Finish()); +Result SstFileWriter::FlushBlockWriter(BlockWriter* writer) { + PAIMON_ASSIGN_OR_RAISE(MemorySlice memory_slice, writer->Finish()); - auto view = memory_slice->ReadStringView(); + auto view = memory_slice.ReadStringView(); std::shared_ptr buffer; BlockCompressionType compression_type = BlockCompressionType::NONE; @@ -119,17 +118,16 @@ Result> SstFileWriter::FlushBlockWriter( auto crc32c = CRC32C::calculate(view.data(), view.size()); auto compression_val = static_cast(static_cast(compression_type) & 0xFF); crc32c = CRC32C::calculate(&compression_val, 1, crc32c); - auto trailer_memory_slice = - std::make_shared(static_cast(compression_type), crc32c) - ->WriteBlockTrailer(pool_.get()); + auto trailer = BlockTrailer(static_cast(compression_type), crc32c); + auto trailer_memory_slice = trailer.WriteBlockTrailer(pool_.get()); PAIMON_ASSIGN_OR_RAISE(int64_t block_pos, out_->GetPos()); - auto block_handle = std::make_shared(block_pos, view.size()); + BlockHandle block_handle(block_pos, view.size()); // 1. write data PAIMON_RETURN_NOT_OK(WriteBytes(view.data(), view.size())); // 2. write trailer - auto trailer_data = trailer_memory_slice->ReadStringView(); + auto trailer_data = trailer_memory_slice.ReadStringView(); PAIMON_RETURN_NOT_OK(WriteBytes(trailer_data.data(), trailer_data.size())); writer->Reset(); diff --git a/src/paimon/common/sst/sst_file_writer.h b/src/paimon/common/sst/sst_file_writer.h index fb75fc3d..44851252 100644 --- a/src/paimon/common/sst/sst_file_writer.h +++ b/src/paimon/common/sst/sst_file_writer.h @@ -49,20 +49,20 @@ class SstFileWriter { Status Write(std::shared_ptr&& key, std::shared_ptr&& value); - Status Write(std::shared_ptr& slice); + Status Write(const MemorySlice& slice); Status Flush(); - Result> WriteIndexBlock(); + Result WriteIndexBlock(); // When bloom-filter is disabled, return nullptr. Result> WriteBloomFilter(); - Status WriteFooter(const std::shared_ptr& index_block_handle, + Status WriteFooter(const BlockHandle& index_block_handle, const std::shared_ptr& bloom_filter_handle); private: - Result> FlushBlockWriter(std::unique_ptr& writer); + Result FlushBlockWriter(BlockWriter* writer); Status WriteBytes(const char* data, size_t size); diff --git a/src/paimon/common/utils/bit_set.h b/src/paimon/common/utils/bit_set.h index 0c5d4856..662ddaff 100644 --- a/src/paimon/common/utils/bit_set.h +++ b/src/paimon/common/utils/bit_set.h @@ -39,8 +39,8 @@ class PAIMON_EXPORT BitSet { return segment_; } - std::shared_ptr ToSlice() { - return std::make_shared(segment_, offset_, byte_length_); + MemorySlice ToSlice() { + return MemorySlice(segment_, offset_, byte_length_); } int32_t Offset() const { diff --git a/src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.cpp b/src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.cpp index c6289e87..e4d3cb9a 100644 --- a/src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.cpp +++ b/src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.cpp @@ -61,11 +61,15 @@ LookupMergeTreeCompactRewriter::Create( auto write_schema = SpecialFields::CompleteSequenceAndValueKindField(data_schema); // TODO(xinyu.lxy): set executor + // TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory + // usage during compaction. Will fix via parquet format refactor. ReadContextBuilder read_context_builder(path_factory_cache->RootPath()); read_context_builder.SetOptions(options.ToMap()) .EnablePrefetch(true) .SetPrefetchMaxParallelNum(1) - .WithMemoryPool(pool); + .SetPrefetchBatchCount(3) + .WithMemoryPool(pool) + .AddOption("parquet.read.enable-pre-buffer", "false"); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr read_context, read_context_builder.Finish()); diff --git a/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp b/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp index 3a491fc1..e082acf4 100644 --- a/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp +++ b/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp @@ -70,11 +70,15 @@ Result> MergeTreeCompactRewriter::Crea auto write_schema = SpecialFields::CompleteSequenceAndValueKindField(data_schema); // TODO(xinyu.lxy): set executor + // TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory + // usage during compaction. Will fix via parquet format refactor. ReadContextBuilder read_context_builder(path_factory_cache->RootPath()); read_context_builder.SetOptions(options.ToMap()) .EnablePrefetch(true) .SetPrefetchMaxParallelNum(1) - .WithMemoryPool(pool); + .SetPrefetchBatchCount(3) + .WithMemoryPool(pool) + .AddOption("parquet.read.enable-pre-buffer", "false"); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr read_context, read_context_builder.Finish()); @@ -202,6 +206,28 @@ Status MergeTreeCompactRewriter::MergeReadAndWrite( merge_file_split_read_->CreateSortMergeReaderForSection( section, partition_, dv_factory_, /*predicate=*/nullptr, data_file_path_factory, drop_delete)); + if (!rolling_writer) { + // Short-circuit logic: for no rolling writers, simply iterating through the KeyValue + // iterator is sufficient to ensure lookup merge function take effect. + while (true) { + if (cancellation_controller_->IsCancelled()) { + return Status::Cancelled("Compaction is cancelled"); + } + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr key_value_iter, + sort_merge_reader->NextBatch()); + if (key_value_iter == nullptr) { + break; + } + while (true) { + PAIMON_ASSIGN_OR_RAISE(bool has_next, key_value_iter->HasNext()); + if (!has_next) { + break; + } + [[maybe_unused]] KeyValue kv = key_value_iter->Next(); + } + } + return Status::OK(); + } // consumer batch size is WriteBatchSize auto async_key_value_producer_consumer = @@ -219,9 +245,7 @@ Status MergeTreeCompactRewriter::MergeReadAndWrite( if (key_value_batch.batch == nullptr) { break; } - if (rolling_writer) { - PAIMON_RETURN_NOT_OK(rolling_writer->Write(std::move(key_value_batch))); - } + PAIMON_RETURN_NOT_OK(rolling_writer->Write(std::move(key_value_batch))); } return Status::OK(); } diff --git a/src/paimon/core/mergetree/lookup/persist_processor_test.cpp b/src/paimon/core/mergetree/lookup/persist_processor_test.cpp index aa204ba0..cc23512b 100644 --- a/src/paimon/core/mergetree/lookup/persist_processor_test.cpp +++ b/src/paimon/core/mergetree/lookup/persist_processor_test.cpp @@ -32,7 +32,7 @@ class PersistProcessorTest : public testing::Test { ASSERT_EQ(kv.value->GetFieldCount(), 4); ASSERT_FALSE(kv.value->IsNullAt(0)); - ASSERT_EQ(kv.value->GetString(0).ToString(), std::string("Alice")); + ASSERT_EQ(std::string(kv.value->GetStringView(0)), std::string("Alice")); ASSERT_EQ(kv.value->GetInt(1), 10); ASSERT_TRUE(kv.value->IsNullAt(2)); ASSERT_EQ(kv.value->GetDouble(3), 10.1); diff --git a/src/paimon/core/mergetree/lookup_levels.cpp b/src/paimon/core/mergetree/lookup_levels.cpp index 3ce12b4c..e90fe0c3 100644 --- a/src/paimon/core/mergetree/lookup_levels.cpp +++ b/src/paimon/core/mergetree/lookup_levels.cpp @@ -51,11 +51,15 @@ Result>> LookupLevels::Create( auto partition_schema = DataField::ConvertDataFieldsToArrowSchema(partition_fields); // TODO(xinyu.lxy): set executor + // TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory + // usage during compaction. Will fix via parquet format refactor. ReadContextBuilder read_context_builder(path_factory->RootPath()); read_context_builder.SetOptions(options.ToMap()) .EnablePrefetch(true) .SetPrefetchMaxParallelNum(1) - .WithMemoryPool(pool); + .SetPrefetchBatchCount(3) + .WithMemoryPool(pool) + .AddOption("parquet.read.enable-pre-buffer", "false"); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr read_context, read_context_builder.Finish()); PAIMON_ASSIGN_OR_RAISE( @@ -145,7 +149,12 @@ LookupLevels::LookupLevels( key_serializer_(std::move(key_serializer)), serializer_factory_(serializer_factory), lookup_store_factory_(lookup_store_factory) { - value_schema_ = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + if constexpr (std::is_same_v) { + // if T is FilePosition, only read key fields to create sst file is enough + value_schema_ = key_schema_; + } else { + value_schema_ = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + } read_schema_ = SpecialFields::CompleteSequenceAndValueKindField(value_schema_); } template @@ -191,6 +200,16 @@ Result> LookupLevels::CreateLookupFile( template Status LookupLevels::CreateSstFileFromDataFile(const std::shared_ptr& file, const std::string& kv_file_path) { + if constexpr (std::is_same_v) { + // Short-circuit logic: if T is bool, just write empty lookup file. + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr bloom_filter, + LookupStoreFactory::BfGenerator(file->row_count, options_, pool_.get())); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr kv_writer, + lookup_store_factory_->CreateWriter(fs_, kv_file_path, bloom_filter, pool_)); + return kv_writer->Close(); + } // Prepare reader to iterate KeyValue PAIMON_ASSIGN_OR_RAISE( std::vector> raw_readers, diff --git a/src/paimon/core/operation/append_only_file_store_write.cpp b/src/paimon/core/operation/append_only_file_store_write.cpp index 85329f34..88dfb927 100644 --- a/src/paimon/core/operation/append_only_file_store_write.cpp +++ b/src/paimon/core/operation/append_only_file_store_write.cpp @@ -44,7 +44,6 @@ #include "paimon/logging.h" #include "paimon/read_context.h" #include "paimon/result.h" - namespace arrow { class Schema; } // namespace arrow @@ -251,7 +250,12 @@ Result> AppendOnlyFileStoreWrite::CreateFilesReader const BinaryRow& partition, int32_t bucket, DeletionVector::Factory dv_factory, const std::vector>& files) const { ReadContextBuilder context_builder(root_path_); - context_builder.EnablePrefetch(true).SetPrefetchMaxParallelNum(1); + // TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory + // usage during compaction. Will fix via parquet format refactor. + context_builder.EnablePrefetch(true) + .SetPrefetchMaxParallelNum(1) + .SetPrefetchBatchCount(3) + .AddOption("parquet.read.enable-pre-buffer", "false"); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr read_context, context_builder.Finish()); std::map map = options_.ToMap(); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr internal_read_context, diff --git a/src/paimon/core/utils/fields_comparator.cpp b/src/paimon/core/utils/fields_comparator.cpp index 164822e0..4e41d2d3 100644 --- a/src/paimon/core/utils/fields_comparator.cpp +++ b/src/paimon/core/utils/fields_comparator.cpp @@ -183,99 +183,4 @@ Result FieldsComparator::CompareField( input_type->ToString(), field_idx)); } } - -Result FieldsComparator::CompareVariant( - int32_t field_idx, const std::shared_ptr& input_type) { - arrow::Type::type type = input_type->id(); - switch (type) { - case arrow::Type::type::BOOL: - return FieldsComparator::VariantComparatorFunc( - [](const VariantType& lhs, const VariantType& rhs) -> int32_t { - auto lvalue = DataDefine::GetVariantValue(lhs); - auto rvalue = DataDefine::GetVariantValue(rhs); - return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); - }); - case arrow::Type::type::INT8: - return FieldsComparator::VariantComparatorFunc( - [](const VariantType& lhs, const VariantType& rhs) -> int32_t { - auto lvalue = DataDefine::GetVariantValue(lhs); - auto rvalue = DataDefine::GetVariantValue(rhs); - return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); - }); - case arrow::Type::type::INT16: - return FieldsComparator::VariantComparatorFunc( - [](const VariantType& lhs, const VariantType& rhs) -> int32_t { - auto lvalue = DataDefine::GetVariantValue(lhs); - auto rvalue = DataDefine::GetVariantValue(rhs); - return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); - }); - case arrow::Type::type::DATE32: - return FieldsComparator::VariantComparatorFunc( - [](const VariantType& lhs, const VariantType& rhs) -> int32_t { - auto lvalue = DataDefine::GetVariantValue(lhs); - auto rvalue = DataDefine::GetVariantValue(rhs); - return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); - }); - - case arrow::Type::type::INT32: - return FieldsComparator::VariantComparatorFunc( - [](const VariantType& lhs, const VariantType& rhs) -> int32_t { - auto lvalue = DataDefine::GetVariantValue(lhs); - auto rvalue = DataDefine::GetVariantValue(rhs); - return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); - }); - case arrow::Type::type::INT64: - return FieldsComparator::VariantComparatorFunc( - [](const VariantType& lhs, const VariantType& rhs) -> int32_t { - auto lvalue = DataDefine::GetVariantValue(lhs); - auto rvalue = DataDefine::GetVariantValue(rhs); - return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); - }); - case arrow::Type::type::FLOAT: - return FieldsComparator::VariantComparatorFunc( - [](const VariantType& lhs, const VariantType& rhs) -> int32_t { - auto lvalue = DataDefine::GetVariantValue(lhs); - auto rvalue = DataDefine::GetVariantValue(rhs); - return CompareFloatingPoint(lvalue, rvalue); - }); - case arrow::Type::type::DOUBLE: - return FieldsComparator::VariantComparatorFunc( - [](const VariantType& lhs, const VariantType& rhs) -> int32_t { - auto lvalue = DataDefine::GetVariantValue(lhs); - auto rvalue = DataDefine::GetVariantValue(rhs); - return CompareFloatingPoint(lvalue, rvalue); - }); - case arrow::Type::type::BINARY: - case arrow::Type::type::STRING: { - return FieldsComparator::VariantComparatorFunc( - [](const VariantType& lhs, const VariantType& rhs) -> int32_t { - std::string_view lvalue = DataDefine::GetStringView(lhs); - std::string_view rvalue = DataDefine::GetStringView(rhs); - int32_t cmp = lvalue.compare(rvalue); - return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1); - }); - } - case arrow::Type::type::TIMESTAMP: { - return FieldsComparator::VariantComparatorFunc( - [](const VariantType& lhs, const VariantType& rhs) -> int32_t { - auto lvalue = DataDefine::GetVariantValue(lhs); - auto rvalue = DataDefine::GetVariantValue(rhs); - return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); - }); - } - case arrow::Type::type::DECIMAL: { - return FieldsComparator::VariantComparatorFunc( - [](const VariantType& lhs, const VariantType& rhs) -> int32_t { - auto lvalue = DataDefine::GetVariantValue(lhs); - auto rvalue = DataDefine::GetVariantValue(rhs); - int32_t cmp = lvalue.CompareTo(rvalue); - return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1); - }); - } - default: - return Status::NotImplemented(fmt::format("Do not support comparing {} type in idx {}", - input_type->ToString(), field_idx)); - } -} - } // namespace paimon diff --git a/src/paimon/core/utils/fields_comparator.h b/src/paimon/core/utils/fields_comparator.h index 052a8285..7368f8a0 100644 --- a/src/paimon/core/utils/fields_comparator.h +++ b/src/paimon/core/utils/fields_comparator.h @@ -51,12 +51,6 @@ class FieldsComparator { return sort_fields_; } - using VariantComparatorFunc = - std::function; - - static Result CompareVariant( - int32_t field_idx, const std::shared_ptr& input_type); - /// Java-compatible ordering for floating-point types: /// -infinity < -0.0 < +0.0 < +infinity < NaN == NaN /// for range index and sst key comparator diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index 51e9be45..f81c0bdc 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -257,7 +257,14 @@ Result<::parquet::ReaderProperties> ParquetFileBatchReader::CreateReaderProperti const std::map& options) { ::parquet::ReaderProperties reader_properties; // TODO(jinli.zjw): set more ReaderProperties (compare with java) - reader_properties.enable_buffered_stream(); + PAIMON_ASSIGN_OR_RAISE( + bool enable_pre_buffer, + OptionsUtils::GetValueFromMap(options, PARQUET_READ_ENABLE_PRE_BUFFER, true)); + if (enable_pre_buffer) { + reader_properties.enable_buffered_stream(); + } else { + reader_properties.disable_buffered_stream(); + } return reader_properties; } @@ -270,7 +277,10 @@ Result<::parquet::ArrowReaderProperties> ParquetFileBatchReader::CreateArrowRead ::parquet::ArrowReaderProperties arrow_reader_props; // TODO(jinli.zjw): set more ArrowReaderProperties (compare with java) - arrow_reader_props.set_pre_buffer(true); + PAIMON_ASSIGN_OR_RAISE( + bool enable_pre_buffer, + OptionsUtils::GetValueFromMap(options, PARQUET_READ_ENABLE_PRE_BUFFER, true)); + arrow_reader_props.set_pre_buffer(enable_pre_buffer); arrow_reader_props.set_batch_size(static_cast(batch_size)); arrow_reader_props.set_use_threads(use_threads); PAIMON_ASSIGN_OR_RAISE(bool cache_lazy, OptionsUtils::GetValueFromMap( diff --git a/src/paimon/format/parquet/parquet_format_defs.h b/src/paimon/format/parquet/parquet_format_defs.h index 05046b70..9022dfcf 100644 --- a/src/paimon/format/parquet/parquet_format_defs.h +++ b/src/paimon/format/parquet/parquet_format_defs.h @@ -51,6 +51,9 @@ static inline const char PARQUET_READ_CACHE_OPTION_RANGE_SIZE_LIMIT[] = static inline const char PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT[] = "parquet.read.predicate-node-count-limit"; +// Default is true. Compaction will set to false to reduce memory consumption. +static inline const char PARQUET_READ_ENABLE_PRE_BUFFER[] = "parquet.read.enable-pre-buffer"; + static constexpr uint32_t DEFAULT_PARQUET_READ_CACHE_OPTION_PREFETCH_LIMIT = 0; static constexpr uint32_t DEFAULT_PARQUET_READ_CACHE_OPTION_RANGE_SIZE_LIMIT = 32 * 1024 * 1024; static constexpr uint32_t DEFAULT_PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT = 512; diff --git a/test/inte/CMakeLists.txt b/test/inte/CMakeLists.txt index dd6e8d44..ae8c9f74 100644 --- a/test/inte/CMakeLists.txt +++ b/test/inte/CMakeLists.txt @@ -90,13 +90,6 @@ if(PAIMON_BUILD_TESTS) test_utils_static ${GTEST_LINK_TOOLCHAIN}) - add_paimon_test(key_value_compaction_inte_test - STATIC_LINK_LIBS - paimon_shared - ${TEST_STATIC_LINK_LIBS} - test_utils_static - ${GTEST_LINK_TOOLCHAIN}) - add_paimon_test(pk_compaction_inte_test STATIC_LINK_LIBS paimon_shared diff --git a/test/inte/key_value_compaction_inte_test.cpp b/test/inte/key_value_compaction_inte_test.cpp deleted file mode 100644 index ea3cd43e..00000000 --- a/test/inte/key_value_compaction_inte_test.cpp +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Copyright 2026-present Alibaba Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include - -#include "arrow/c/bridge.h" -#include "gtest/gtest.h" -#include "paimon/commit_context.h" -#include "paimon/common/data/binary_row.h" -#include "paimon/common/factories/io_hook.h" -#include "paimon/common/utils/scope_guard.h" -#include "paimon/core/append/bucketed_append_compact_manager.h" -#include "paimon/core/io/data_file_meta.h" -#include "paimon/core/operation/append_only_file_store_write.h" -#include "paimon/core/operation/restore_files.h" -#include "paimon/core/table/sink/commit_message_impl.h" -#include "paimon/core/table/source/data_split_impl.h" -#include "paimon/executor.h" -#include "paimon/file_store_commit.h" -#include "paimon/file_store_write.h" -#include "paimon/result.h" -#include "paimon/testing/utils/binary_row_generator.h" -#include "paimon/testing/utils/data_generator.h" -#include "paimon/testing/utils/io_exception_helper.h" -#include "paimon/testing/utils/test_helper.h" -#include "paimon/testing/utils/testharness.h" -#include "paimon/write_context.h" - -namespace paimon::test { - -class KeyValueCompactionInteTest : public testing::Test, - public ::testing::WithParamInterface { - public: - void SetUp() override { - pool_ = GetDefaultPool(); - } - - void PrepareSimpleKeyValueData(const std::shared_ptr& gen, TestHelper* helper, - int64_t* identifier) { - auto& commit_identifier = *identifier; - std::vector datas_1; - datas_1.push_back( - BinaryRowGenerator::GenerateRow({std::string("Alice"), 10, 1, 11.1}, pool_.get())); - datas_1.push_back( - BinaryRowGenerator::GenerateRow({std::string("Bob"), 10, 0, 12.1}, pool_.get())); - datas_1.push_back( - BinaryRowGenerator::GenerateRow({std::string("Emily"), 10, 0, 13.1}, pool_.get())); - datas_1.push_back( - BinaryRowGenerator::GenerateRow({std::string("Tony"), 10, 0, 14.1}, pool_.get())); - datas_1.push_back( - BinaryRowGenerator::GenerateRow({std::string("Lucy"), 20, 1, 14.1}, pool_.get())); - ASSERT_OK_AND_ASSIGN(auto batches_1, gen->SplitArrayByPartitionAndBucket(datas_1)); - ASSERT_EQ(3, batches_1.size()); - ASSERT_OK_AND_ASSIGN( - auto commit_msgs, - helper->WriteAndCommit(std::move(batches_1), commit_identifier++, std::nullopt)); - ASSERT_OK_AND_ASSIGN(std::optional snapshot1, helper->LatestSnapshot()); - ASSERT_TRUE(snapshot1); - ASSERT_EQ(1, snapshot1.value().Id()); - ASSERT_EQ(5, snapshot1.value().TotalRecordCount().value()); - ASSERT_EQ(5, snapshot1.value().DeltaRecordCount().value()); - - std::vector datas_2; - datas_2.push_back( - BinaryRowGenerator::GenerateRow({std::string("Emily"), 10, 0, 15.1}, pool_.get())); - datas_2.push_back( - BinaryRowGenerator::GenerateRow({std::string("Bob"), 10, 0, 12.1}, pool_.get())); - datas_2.push_back( - BinaryRowGenerator::GenerateRow({std::string("Alex"), 10, 0, 16.1}, pool_.get())); - datas_2.push_back( - BinaryRowGenerator::GenerateRow({std::string("Paul"), 20, 1, NullType()}, pool_.get())); - ASSERT_OK_AND_ASSIGN(auto batches_2, gen->SplitArrayByPartitionAndBucket(datas_2)); - ASSERT_EQ(2, batches_2.size()); - ASSERT_OK_AND_ASSIGN( - auto commit_msgs_2, - helper->WriteAndCommit(std::move(batches_2), commit_identifier++, std::nullopt)); - ASSERT_OK_AND_ASSIGN(std::optional snapshot2, helper->LatestSnapshot()); - ASSERT_TRUE(snapshot2); - ASSERT_EQ(2, snapshot2.value().Id()); - ASSERT_EQ(9, snapshot2.value().TotalRecordCount().value()); - ASSERT_EQ(4, snapshot2.value().DeltaRecordCount().value()); - - std::vector datas_3; - datas_3.push_back( - BinaryRowGenerator::GenerateRow({std::string("David"), 10, 0, 17.1}, pool_.get())); - ASSERT_OK_AND_ASSIGN(auto batches_3, gen->SplitArrayByPartitionAndBucket(datas_3)); - ASSERT_EQ(1, batches_3.size()); - ASSERT_OK_AND_ASSIGN( - auto commit_msgs_3, - helper->WriteAndCommit(std::move(batches_3), commit_identifier++, std::nullopt)); - ASSERT_OK_AND_ASSIGN(std::optional snapshot3, helper->LatestSnapshot()); - ASSERT_TRUE(snapshot3); - ASSERT_EQ(3, snapshot3.value().Id()); - ASSERT_EQ(10, snapshot3.value().TotalRecordCount().value()); - ASSERT_EQ(1, snapshot3.value().DeltaRecordCount().value()); - } - - private: - std::shared_ptr pool_; -}; - -std::vector GetTestValuesForCompactionInteTest() { - std::vector values; - values.emplace_back("parquet"); -#ifdef PAIMON_ENABLE_ORC - values.emplace_back("orc"); -#endif -#ifdef PAIMON_ENABLE_LANCE - values.emplace_back("lance"); -#endif -#ifdef PAIMON_ENABLE_AVRO - values.emplace_back("avro"); -#endif - return values; -} - -INSTANTIATE_TEST_SUITE_P(FileFormat, KeyValueCompactionInteTest, - ::testing::ValuesIn(GetTestValuesForCompactionInteTest())); - -TEST_F(KeyValueCompactionInteTest, TestKeyValueTableCompactionWithIOException) { - arrow::FieldVector fields = { - arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), - arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; - auto schema = arrow::schema(fields); - - std::vector primary_keys = {"f0", "f1", "f2"}; - std::vector partition_keys = {"f1"}; - std::map options = {{Options::FILE_FORMAT, "parquet"}, - {Options::BUCKET, "2"}, - {Options::BUCKET_KEY, "f2"}, - {Options::FILE_SYSTEM, "local"}, - {Options::DELETION_VECTORS_ENABLED, "false"}}; - - bool compaction_run_complete = false; - auto io_hook = IOHook::GetInstance(); - for (size_t i = 0; i < 600; ++i) { - auto dir = UniqueTestDirectory::Create(); - ASSERT_TRUE(dir); - - ASSERT_OK_AND_ASSIGN(auto helper, - TestHelper::Create(dir->Str(), schema, partition_keys, primary_keys, - options, /*is_streaming_mode=*/true)); - ASSERT_OK_AND_ASSIGN(std::optional> table_schema, - helper->LatestSchema()); - ASSERT_TRUE(table_schema); - - auto gen = std::make_shared(table_schema.value(), pool_); - int64_t commit_identifier = 0; - PrepareSimpleKeyValueData(gen, helper.get(), &commit_identifier); - - std::vector data; - data.push_back( - BinaryRowGenerator::GenerateRow({std::string("Lily"), 10, 0, 17.1}, pool_.get())); - ASSERT_OK_AND_ASSIGN(auto batches, gen->SplitArrayByPartitionAndBucket(data)); - ASSERT_EQ(1, batches.size()); - - ASSERT_OK_AND_ASSIGN( - auto helper2, - TestHelper::Create(dir->Str(), schema, partition_keys, primary_keys, options, - /*is_streaming_mode=*/true, /*ignore_if_exists=*/true)); - - ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); - io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); - - CHECK_HOOK_STATUS(helper2->write_->Write(std::move(batches[0])), i); - CHECK_HOOK_STATUS(helper2->write_->Compact(/*partition=*/{{"f1", "10"}}, /*bucket=*/1, - /*full_compaction=*/true), - i); - - Result>> commit_messages = - helper2->write_->PrepareCommit(/*wait_compaction=*/true, commit_identifier); - CHECK_HOOK_STATUS(commit_messages.status(), i); - CHECK_HOOK_STATUS(helper2->commit_->Commit(commit_messages.value(), commit_identifier), i); - - compaction_run_complete = true; - io_hook->Clear(); - - ASSERT_OK_AND_ASSIGN(std::optional latest_snapshot, helper2->LatestSnapshot()); - ASSERT_TRUE(latest_snapshot); - ASSERT_EQ(Snapshot::CommitKind::Compact(), latest_snapshot->GetCommitKind()); - break; - } - - ASSERT_TRUE(compaction_run_complete); -} - -TEST_P(KeyValueCompactionInteTest, TestKeyValueTableStreamWriteFullCompaction) { - auto dir = UniqueTestDirectory::Create(); - ASSERT_TRUE(dir); - arrow::FieldVector fields = { - arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), - arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; - auto schema = arrow::schema(fields); - - std::vector primary_keys = {"f0", "f1", "f2"}; - std::vector partition_keys = {"f1"}; - auto file_format = GetParam(); - std::map options = {{Options::FILE_FORMAT, "parquet"}, - {Options::BUCKET, "2"}, - {Options::BUCKET_KEY, "f2"}, - {Options::FILE_SYSTEM, "local"}, - {Options::DELETION_VECTORS_ENABLED, "false"}}; - - ASSERT_OK_AND_ASSIGN( - auto helper, TestHelper::Create(dir->Str(), schema, partition_keys, primary_keys, options, - /*is_streaming_mode=*/true)); - ASSERT_OK_AND_ASSIGN(std::optional> table_schema, - helper->LatestSchema()); - ASSERT_TRUE(table_schema); - auto gen = std::make_shared(table_schema.value(), pool_); - int64_t commit_identifier = 0; - PrepareSimpleKeyValueData(gen, helper.get(), &commit_identifier); - std::vector datas_4; - datas_4.push_back( - BinaryRowGenerator::GenerateRow({std::string("Lily"), 10, 0, 17.1}, pool_.get())); - ASSERT_OK_AND_ASSIGN(auto batches_4, gen->SplitArrayByPartitionAndBucket(datas_4)); - ASSERT_EQ(1, batches_4.size()); - - ASSERT_OK(helper->write_->Write(std::move(batches_4[0]))); - ASSERT_OK(helper->write_->Compact(/*partition=*/{{"f1", "10"}}, /*bucket=*/1, - /*full_compaction=*/true)); - ASSERT_OK_AND_ASSIGN( - std::vector> commit_messages, - helper->write_->PrepareCommit(/*wait_compaction=*/true, commit_identifier)); - ASSERT_OK(helper->commit_->Commit(commit_messages, commit_identifier)); - ASSERT_OK_AND_ASSIGN(std::optional snapshot5, helper->LatestSnapshot()); - ASSERT_EQ(5, snapshot5.value().Id()); - ASSERT_EQ(9, snapshot5.value().TotalRecordCount().value()); - ASSERT_EQ(-2, snapshot5.value().DeltaRecordCount().value()); - ASSERT_EQ(Snapshot::CommitKind::Compact(), snapshot5.value().GetCommitKind()); - ASSERT_OK_AND_ASSIGN(std::vector> data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); - ASSERT_EQ(data_splits.size(), 3); - std::map, std::string> expected_datas; - expected_datas[std::make_pair("f1=10/", 0)] = R"([ -[0, "Alice", 10, 1, 11.1] -])"; - - expected_datas[std::make_pair("f1=10/", 1)] = R"([ -[0, "Alex", 10, 0, 16.1], -[0, "Bob", 10, 0, 12.1], -[0, "David", 10, 0, 17.1], -[0, "Emily", 10, 0, 15.1], -[0, "Lily", 10, 0, 17.1], -[0, "Tony", 10, 0, 14.1] -])"; - - expected_datas[std::make_pair("f1=20/", 0)] = R"([ -[0, "Lucy", 20, 1, 14.1], -[0, "Paul", 20, 1, null] -])"; - - arrow::FieldVector fields_with_row_kind = fields; - fields_with_row_kind.insert(fields_with_row_kind.begin(), - arrow::field("_VALUE_KIND", arrow::int8())); - auto data_type = arrow::struct_(fields_with_row_kind); - - for (const auto& split : data_splits) { - auto split_impl = dynamic_cast(split.get()); - ASSERT_OK_AND_ASSIGN(std::string partition_str, - helper->PartitionStr(split_impl->Partition())); - auto iter = expected_datas.find(std::make_pair(partition_str, split_impl->Bucket())); - ASSERT_TRUE(iter != expected_datas.end()); - ASSERT_OK_AND_ASSIGN(bool success, - helper->ReadAndCheckResult(data_type, {split}, iter->second)); - ASSERT_TRUE(success); - } -} - -} // namespace paimon::test diff --git a/test/inte/pk_compaction_inte_test.cpp b/test/inte/pk_compaction_inte_test.cpp index 4b65cd81..e34118d4 100644 --- a/test/inte/pk_compaction_inte_test.cpp +++ b/test/inte/pk_compaction_inte_test.cpp @@ -26,6 +26,7 @@ #include "gtest/gtest.h" #include "paimon/catalog/catalog.h" #include "paimon/commit_context.h" +#include "paimon/common/factories/io_hook.h" #include "paimon/common/utils/path_util.h" #include "paimon/core/deletionvectors/deletion_vectors_index_file.h" #include "paimon/core/io/data_file_meta.h" @@ -47,6 +48,8 @@ #include "paimon/status.h" #include "paimon/table/source/table_read.h" #include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/data_generator.h" +#include "paimon/testing/utils/io_exception_helper.h" #include "paimon/testing/utils/read_result_collector.h" #include "paimon/testing/utils/test_helper.h" #include "paimon/testing/utils/testharness.h" @@ -54,9 +57,11 @@ namespace paimon::test { -class PkCompactionInteTest : public ::testing::Test { +class PkCompactionInteTest : public ::testing::Test, + public ::testing::WithParamInterface { public: void SetUp() override { + pool_ = GetDefaultPool(); dir_ = UniqueTestDirectory::Create("local"); } @@ -87,6 +92,65 @@ class PkCompactionInteTest : public ::testing::Test { } // ---- Write helpers ---- + void PrepareSimpleKeyValueData(const std::shared_ptr& gen, TestHelper* helper, + int64_t* identifier) { + auto& commit_identifier = *identifier; + std::vector datas_1; + datas_1.push_back( + BinaryRowGenerator::GenerateRow({std::string("Alice"), 10, 1, 11.1}, pool_.get())); + datas_1.push_back( + BinaryRowGenerator::GenerateRow({std::string("Bob"), 10, 0, 12.1}, pool_.get())); + datas_1.push_back( + BinaryRowGenerator::GenerateRow({std::string("Emily"), 10, 0, 13.1}, pool_.get())); + datas_1.push_back( + BinaryRowGenerator::GenerateRow({std::string("Tony"), 10, 0, 14.1}, pool_.get())); + datas_1.push_back( + BinaryRowGenerator::GenerateRow({std::string("Lucy"), 20, 1, 14.1}, pool_.get())); + ASSERT_OK_AND_ASSIGN(auto batches_1, gen->SplitArrayByPartitionAndBucket(datas_1)); + ASSERT_EQ(3, batches_1.size()); + ASSERT_OK_AND_ASSIGN( + auto commit_msgs, + helper->WriteAndCommit(std::move(batches_1), commit_identifier++, std::nullopt)); + ASSERT_OK_AND_ASSIGN(std::optional snapshot1, helper->LatestSnapshot()); + ASSERT_TRUE(snapshot1); + ASSERT_EQ(1, snapshot1.value().Id()); + ASSERT_EQ(5, snapshot1.value().TotalRecordCount().value()); + ASSERT_EQ(5, snapshot1.value().DeltaRecordCount().value()); + + std::vector datas_2; + datas_2.push_back( + BinaryRowGenerator::GenerateRow({std::string("Emily"), 10, 0, 15.1}, pool_.get())); + datas_2.push_back( + BinaryRowGenerator::GenerateRow({std::string("Bob"), 10, 0, 12.1}, pool_.get())); + datas_2.push_back( + BinaryRowGenerator::GenerateRow({std::string("Alex"), 10, 0, 16.1}, pool_.get())); + datas_2.push_back( + BinaryRowGenerator::GenerateRow({std::string("Paul"), 20, 1, NullType()}, pool_.get())); + ASSERT_OK_AND_ASSIGN(auto batches_2, gen->SplitArrayByPartitionAndBucket(datas_2)); + ASSERT_EQ(2, batches_2.size()); + ASSERT_OK_AND_ASSIGN( + auto commit_msgs_2, + helper->WriteAndCommit(std::move(batches_2), commit_identifier++, std::nullopt)); + ASSERT_OK_AND_ASSIGN(std::optional snapshot2, helper->LatestSnapshot()); + ASSERT_TRUE(snapshot2); + ASSERT_EQ(2, snapshot2.value().Id()); + ASSERT_EQ(9, snapshot2.value().TotalRecordCount().value()); + ASSERT_EQ(4, snapshot2.value().DeltaRecordCount().value()); + + std::vector datas_3; + datas_3.push_back( + BinaryRowGenerator::GenerateRow({std::string("David"), 10, 0, 17.1}, pool_.get())); + ASSERT_OK_AND_ASSIGN(auto batches_3, gen->SplitArrayByPartitionAndBucket(datas_3)); + ASSERT_EQ(1, batches_3.size()); + ASSERT_OK_AND_ASSIGN( + auto commit_msgs_3, + helper->WriteAndCommit(std::move(batches_3), commit_identifier++, std::nullopt)); + ASSERT_OK_AND_ASSIGN(std::optional snapshot3, helper->LatestSnapshot()); + ASSERT_TRUE(snapshot3); + ASSERT_EQ(3, snapshot3.value().Id()); + ASSERT_EQ(10, snapshot3.value().TotalRecordCount().value()); + ASSERT_EQ(1, snapshot3.value().DeltaRecordCount().value()); + } Result>> WriteArray( const std::string& table_path, const std::map& partition, @@ -211,6 +275,7 @@ class PkCompactionInteTest : public ::testing::Test { } private: + std::shared_ptr pool_; std::unique_ptr dir_; arrow::FieldVector fields_; }; @@ -1756,18 +1821,15 @@ TEST_F(PkCompactionInteTest, TestPartialUpdateWithDV) { // Step 1: Write initial data with all fields non-null and large padding. { + // clang-format off std::string json_data = R"([ - ["Alice", 10, 10, 1.0, ")" + - padding + R"("], - ["Bob", 10, 20, 2.0, ")" + - padding + R"("], - ["Carol", 10, 30, 3.0, ")" + - padding + R"("], - ["Dave", 10, 40, 4.0, ")" + - padding + R"("], - ["Eve", 10, 50, 5.0, ")" + - padding + R"("] + ["Alice", 10, 10, 1.0, ")" + padding + R"("], + ["Bob", 10, 20, 2.0, ")" + padding + R"("], + ["Carol", 10, 30, 3.0, ")" + padding + R"("], + ["Dave", 10, 40, 4.0, ")" + padding + R"("], + ["Eve", 10, 50, 5.0, ")" + padding + R"("] ])"; + // clang-format on auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, json_data).ValueOrDie(); ASSERT_OK(WriteAndCommit(table_path, {{"f1", "10"}}, 0, array, commit_id++)); } @@ -1825,15 +1887,15 @@ TEST_F(PkCompactionInteTest, TestPartialUpdateWithDV) { // then intermediate-level file (Alice, Bob, Carol with merged values). { std::map, std::string> expected_data; + // clang-format off expected_data[std::make_pair("f1=10/", 0)] = R"([ - [0, "Dave", 10, 40, 4.0, ")" + padding + - R"("], - [0, "Eve", 10, 50, 5.0, ")" + padding + - R"("], + [0, "Dave", 10, 40, 4.0, ")" + padding + R"("], + [0, "Eve", 10, 50, 5.0, ")" + padding + R"("], [0, "Alice", 10, 99, 1.0, "u1"], [0, "Bob", 10, 88, 22.0, "u3"], [0, "Carol", 10, 30, 33.0, "u4"] ])"; + // clang-format on ScanAndVerify(table_path, fields, expected_data); } @@ -1847,6 +1909,7 @@ TEST_F(PkCompactionInteTest, TestPartialUpdateWithDV) { // Step 8: ScanAndVerify after full compact (globally sorted after merge). { std::map, std::string> expected_data; + // clang-format off expected_data[std::make_pair("f1=10/", 0)] = R"([ [0, "Alice", 10, 99, 1.0, "u1"], [0, "Bob", 10, 88, 22.0, "u3"], @@ -1856,6 +1919,7 @@ TEST_F(PkCompactionInteTest, TestPartialUpdateWithDV) { [0, "Eve", 10, 50, 5.0, ")" + padding + R"("] ])"; + // clang-format on ScanAndVerify(table_path, fields, expected_data); } } @@ -1882,18 +1946,15 @@ TEST_F(PkCompactionInteTest, TestDeduplicateWithDvInAllLevels) { // Step 1: Write initial data with large padding → full compact → single L5 file. { + // clang-format off std::string json_data = R"([ - ["Alice", 10, 0, 1.0, ")" + - padding + R"("], - ["Bob", 10, 0, 2.0, ")" + - padding + R"("], - ["Carol", 10, 0, 3.0, ")" + - padding + R"("], - ["Dave", 10, 0, 4.0, ")" + - padding + R"("], - ["Eve", 10, 0, 5.0, ")" + - padding + R"("] + ["Alice", 10, 0, 1.0, ")" + padding + R"("], + ["Bob", 10, 0, 2.0, ")" + padding + R"("], + ["Carol", 10, 0, 3.0, ")" + padding + R"("], + ["Dave", 10, 0, 4.0, ")" + padding + R"("], + ["Eve", 10, 0, 5.0, ")" + padding + R"("] ])"; + // clang-format on auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, json_data).ValueOrDie(); ASSERT_OK(WriteAndCommit(table_path, {{"f1", "10"}}, 0, array, commit_id++)); @@ -2020,18 +2081,15 @@ TEST_F(PkCompactionInteTest, TestDeduplicateWithForceLookupNoDv) { // Step 1: Write initial data with large padding field (creates a big level-0 file). { + // clang-format off std::string json_data = R"([ - ["Alice", 10, 0, 1.0, ")" + - padding + R"("], - ["Bob", 10, 0, 2.0, ")" + - padding + R"("], - ["Carol", 10, 0, 3.0, ")" + - padding + R"("], - ["Dave", 10, 0, 4.0, ")" + - padding + R"("], - ["Eve", 10, 0, 5.0, ")" + - padding + R"("] + ["Alice", 10, 0, 1.0, ")" + padding + R"("], + ["Bob", 10, 0, 2.0, ")" + padding + R"("], + ["Carol", 10, 0, 3.0, ")" + padding + R"("], + ["Dave", 10, 0, 4.0, ")" + padding + R"("], + ["Eve", 10, 0, 5.0, ")" + padding + R"("] ])"; + // clang-format on auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, json_data).ValueOrDie(); ASSERT_OK(WriteAndCommit(table_path, {{"f1", "10"}}, 0, array, commit_id++)); } @@ -2095,6 +2153,7 @@ TEST_F(PkCompactionInteTest, TestDeduplicateWithForceLookupNoDv) { // Step 8: ScanAndVerify after full compact (globally sorted after merge). { + // clang-format off std::map, std::string> expected_data; expected_data[std::make_pair("f1=10/", 0)] = R"([ [0, "Alice", 10, 0, 101.0, "u1"], @@ -2103,6 +2162,7 @@ TEST_F(PkCompactionInteTest, TestDeduplicateWithForceLookupNoDv) { [0, "Dave", 10, 0, 4.0, ")" + padding + R"("], [0, "Eve", 10, 0, 5.0, ")" + padding + R"("] ])"; + // clang-format on ScanAndVerify(table_path, fields, expected_data); } } @@ -2300,12 +2360,12 @@ TEST_F(PkCompactionInteTest, TestDuplicateWithDvAndOrphanDelete) { // Step 1: Write INSERT rows for Alice and Bob only. { + // clang-format off std::string json_data = R"([ - ["Alice", 10, 100, 1.0, ")" + - padding + R"("], - ["Bob", 10, 200, 2.0, ")" + - padding + R"("] + ["Alice", 10, 100, 1.0, ")" + padding + R"("], + ["Bob", 10, 200, 2.0, ")" + padding + R"("] ])"; + // clang-format on auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, json_data).ValueOrDie(); ASSERT_OK(WriteAndCommit(table_path, {{"f1", "10"}}, 0, array, commit_id++)); } @@ -2345,11 +2405,12 @@ TEST_F(PkCompactionInteTest, TestDuplicateWithDvAndOrphanDelete) { // Step 5: ScanAndVerify after DV compact. { std::map, std::string> expected_data; + // clang-format off expected_data[std::make_pair("f1=10/", 0)] = R"([ - [0, "Bob", 10, 200, 2.0, ")" + padding + - R"("], + [0, "Bob", 10, 200, 2.0, ")" + padding + R"("], [0, "Dave", 10, 50, 5.0, "u3"] ])"; + // clang-format on ScanAndVerify(table_path, fields, expected_data); } @@ -2363,12 +2424,180 @@ TEST_F(PkCompactionInteTest, TestDuplicateWithDvAndOrphanDelete) { // Step 7: ScanAndVerify after full compact (globally sorted). { std::map, std::string> expected_data; + // clang-format off expected_data[std::make_pair("f1=10/", 0)] = R"([ - [0, "Bob", 10, 200, 2.0, ")" + padding + - R"("], + [0, "Bob", 10, 200, 2.0, ")" + padding + R"("], [0, "Dave", 10, 50, 5.0, "u3"] ])"; + // clang-format on ScanAndVerify(table_path, fields, expected_data); } } + +TEST_F(PkCompactionInteTest, TestKeyValueTableCompactionWithIOException) { + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + auto schema = arrow::schema(fields); + + std::vector primary_keys = {"f0", "f1", "f2"}; + std::vector partition_keys = {"f1"}; + std::map options = {{Options::FILE_FORMAT, "parquet"}, + {Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "f2"}, + {Options::FILE_SYSTEM, "local"}, + {Options::DELETION_VECTORS_ENABLED, "false"}}; + + bool compaction_run_complete = false; + auto io_hook = IOHook::GetInstance(); + for (size_t i = 0; i < 600; ++i) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + ASSERT_OK_AND_ASSIGN(auto helper, + TestHelper::Create(dir->Str(), schema, partition_keys, primary_keys, + options, /*is_streaming_mode=*/true)); + ASSERT_OK_AND_ASSIGN(std::optional> table_schema, + helper->LatestSchema()); + ASSERT_TRUE(table_schema); + + auto gen = std::make_shared(table_schema.value(), pool_); + int64_t commit_identifier = 0; + PrepareSimpleKeyValueData(gen, helper.get(), &commit_identifier); + + std::vector data; + data.push_back( + BinaryRowGenerator::GenerateRow({std::string("Lily"), 10, 0, 17.1}, pool_.get())); + ASSERT_OK_AND_ASSIGN(auto batches, gen->SplitArrayByPartitionAndBucket(data)); + ASSERT_EQ(1, batches.size()); + + ASSERT_OK_AND_ASSIGN( + auto helper2, + TestHelper::Create(dir->Str(), schema, partition_keys, primary_keys, options, + /*is_streaming_mode=*/true, /*ignore_if_exists=*/true)); + + ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); + io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); + + CHECK_HOOK_STATUS(helper2->write_->Write(std::move(batches[0])), i); + CHECK_HOOK_STATUS(helper2->write_->Compact(/*partition=*/{{"f1", "10"}}, /*bucket=*/1, + /*full_compaction=*/true), + i); + + Result>> commit_messages = + helper2->write_->PrepareCommit(/*wait_compaction=*/true, commit_identifier); + CHECK_HOOK_STATUS(commit_messages.status(), i); + CHECK_HOOK_STATUS(helper2->commit_->Commit(commit_messages.value(), commit_identifier), i); + + compaction_run_complete = true; + io_hook->Clear(); + + ASSERT_OK_AND_ASSIGN(std::optional latest_snapshot, helper2->LatestSnapshot()); + ASSERT_TRUE(latest_snapshot); + ASSERT_EQ(Snapshot::CommitKind::Compact(), latest_snapshot->GetCommitKind()); + break; + } + + ASSERT_TRUE(compaction_run_complete); +} + +TEST_P(PkCompactionInteTest, TestKeyValueTableStreamWriteFullCompaction) { + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + auto schema = arrow::schema(fields); + + std::vector primary_keys = {"f0", "f1", "f2"}; + std::vector partition_keys = {"f1"}; + auto file_format = GetParam(); + std::map options = {{Options::FILE_FORMAT, file_format}, + {Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "f2"}, + {Options::FILE_SYSTEM, "local"}, + {Options::DELETION_VECTORS_ENABLED, "false"}}; + + ASSERT_OK_AND_ASSIGN( + auto helper, TestHelper::Create(dir_->Str(), schema, partition_keys, primary_keys, options, + /*is_streaming_mode=*/true)); + ASSERT_OK_AND_ASSIGN(std::optional> table_schema, + helper->LatestSchema()); + ASSERT_TRUE(table_schema); + auto gen = std::make_shared(table_schema.value(), pool_); + int64_t commit_identifier = 0; + PrepareSimpleKeyValueData(gen, helper.get(), &commit_identifier); + std::vector datas_4; + datas_4.push_back( + BinaryRowGenerator::GenerateRow({std::string("Lily"), 10, 0, 17.1}, pool_.get())); + ASSERT_OK_AND_ASSIGN(auto batches_4, gen->SplitArrayByPartitionAndBucket(datas_4)); + ASSERT_EQ(1, batches_4.size()); + + ASSERT_OK(helper->write_->Write(std::move(batches_4[0]))); + ASSERT_OK(helper->write_->Compact(/*partition=*/{{"f1", "10"}}, /*bucket=*/1, + /*full_compaction=*/true)); + ASSERT_OK_AND_ASSIGN( + std::vector> commit_messages, + helper->write_->PrepareCommit(/*wait_compaction=*/true, commit_identifier)); + ASSERT_OK(helper->commit_->Commit(commit_messages, commit_identifier)); + ASSERT_OK_AND_ASSIGN(std::optional snapshot5, helper->LatestSnapshot()); + ASSERT_EQ(5, snapshot5.value().Id()); + ASSERT_EQ(9, snapshot5.value().TotalRecordCount().value()); + ASSERT_EQ(-2, snapshot5.value().DeltaRecordCount().value()); + ASSERT_EQ(Snapshot::CommitKind::Compact(), snapshot5.value().GetCommitKind()); + ASSERT_OK_AND_ASSIGN(std::vector> data_splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ASSERT_EQ(data_splits.size(), 3); + std::map, std::string> expected_datas; + expected_datas[std::make_pair("f1=10/", 0)] = R"([ +[0, "Alice", 10, 1, 11.1] +])"; + + expected_datas[std::make_pair("f1=10/", 1)] = R"([ +[0, "Alex", 10, 0, 16.1], +[0, "Bob", 10, 0, 12.1], +[0, "David", 10, 0, 17.1], +[0, "Emily", 10, 0, 15.1], +[0, "Lily", 10, 0, 17.1], +[0, "Tony", 10, 0, 14.1] +])"; + + expected_datas[std::make_pair("f1=20/", 0)] = R"([ +[0, "Lucy", 20, 1, 14.1], +[0, "Paul", 20, 1, null] +])"; + + arrow::FieldVector fields_with_row_kind = fields; + fields_with_row_kind.insert(fields_with_row_kind.begin(), + arrow::field("_VALUE_KIND", arrow::int8())); + auto data_type = arrow::struct_(fields_with_row_kind); + + for (const auto& split : data_splits) { + auto split_impl = dynamic_cast(split.get()); + ASSERT_OK_AND_ASSIGN(std::string partition_str, + helper->PartitionStr(split_impl->Partition())); + auto iter = expected_datas.find(std::make_pair(partition_str, split_impl->Bucket())); + ASSERT_TRUE(iter != expected_datas.end()); + ASSERT_OK_AND_ASSIGN(bool success, + helper->ReadAndCheckResult(data_type, {split}, iter->second)); + ASSERT_TRUE(success); + } +} + +std::vector GetTestValuesForCompactionInteTest() { + std::vector values; + values.emplace_back("parquet"); +#ifdef PAIMON_ENABLE_ORC + values.emplace_back("orc"); +#endif +#ifdef PAIMON_ENABLE_LANCE + values.emplace_back("lance"); +#endif +#ifdef PAIMON_ENABLE_AVRO + values.emplace_back("avro"); +#endif + return values; +} + +INSTANTIATE_TEST_SUITE_P(FileFormat, PkCompactionInteTest, + ::testing::ValuesIn(GetTestValuesForCompactionInteTest())); + } // namespace paimon::test