-
Notifications
You must be signed in to change notification settings - Fork 4.1k
GH-37293: [C++][Parquet] Encoding: Add Benchmark for DELTA_BYTE_ARRAY #37641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
c99d49d
4feb5c0
d9825d2
c567ed6
9a8d100
e8e8766
557d0ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3300,14 +3300,19 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode | |
|
|
||
| void SetData(int num_values, const uint8_t* data, int len) override { | ||
| num_values_ = num_values; | ||
| decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len); | ||
| if (decoder_) { | ||
| decoder_->Reset(data, len); | ||
| } else { | ||
| decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len); | ||
| } | ||
| prefix_len_decoder_.SetDecoder(num_values, decoder_); | ||
|
|
||
| // get the number of encoded prefix lengths | ||
| int num_prefix = prefix_len_decoder_.ValidValuesCount(); | ||
| // call prefix_len_decoder_.Decode to decode all the prefix lengths. | ||
| // all the prefix lengths are buffered in buffered_prefix_length_. | ||
| PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix * sizeof(int32_t))); | ||
| PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix * sizeof(int32_t), | ||
| /*shrink_to_fit=*/false)); | ||
| int ret = prefix_len_decoder_.Decode( | ||
| reinterpret_cast<int32_t*>(buffered_prefix_length_->mutable_data()), num_prefix); | ||
| DCHECK_EQ(ret, num_prefix); | ||
|
|
@@ -3323,7 +3328,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode | |
|
|
||
| // TODO: read corrupted files written with bug(PARQUET-246). last_value_ should be set | ||
| // to last_value_in_previous_page_ when decoding a new page(except the first page) | ||
| last_value_ = ""; | ||
| last_value_.clear(); | ||
| } | ||
|
|
||
| int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, | ||
|
|
@@ -3370,20 +3375,24 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode | |
| throw ParquetException("excess expansion in DELTA_BYTE_ARRAY"); | ||
| } | ||
| } | ||
| PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size)); | ||
| // TODO(mwish): Release the buffer if it is too large. | ||
| PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size, /*shrink_to_fit=*/false)); | ||
|
|
||
| string_view prefix{last_value_}; | ||
| uint8_t* data_ptr = buffered_data_->mutable_data(); | ||
| for (int i = 0; i < max_values; ++i) { | ||
| if (ARROW_PREDICT_FALSE(static_cast<size_t>(prefix_len_ptr[i]) > prefix.length())) { | ||
| throw ParquetException("prefix length too large in DELTA_BYTE_ARRAY"); | ||
| } | ||
| memcpy(data_ptr, prefix.data(), prefix_len_ptr[i]); | ||
| // buffer[i] currently points to the string suffix | ||
| memcpy(data_ptr + prefix_len_ptr[i], buffer[i].ptr, buffer[i].len); | ||
| buffer[i].ptr = data_ptr; | ||
| buffer[i].len += prefix_len_ptr[i]; | ||
| data_ptr += buffer[i].len; | ||
| // If the prefix length is zero, the prefix can be ignored. | ||
|
||
| if (prefix_len_ptr[i] != 0) { | ||
| memcpy(data_ptr, prefix.data(), prefix_len_ptr[i]); | ||
| // buffer[i] currently points to the string suffix | ||
| memcpy(data_ptr + prefix_len_ptr[i], buffer[i].ptr, buffer[i].len); | ||
wgtmac marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| buffer[i].ptr = data_ptr; | ||
| buffer[i].len += prefix_len_ptr[i]; | ||
| data_ptr += buffer[i].len; | ||
| } | ||
| prefix = std::string_view{buffer[i]}; | ||
| } | ||
| prefix_len_offset_ += max_values; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -737,6 +737,107 @@ static void BM_DeltaLengthDecodingSpacedByteArray(benchmark::State& state) { | |
| BENCHMARK(BM_PlainDecodingSpacedByteArray)->Apply(ByteArrayCustomArguments); | ||
| BENCHMARK(BM_DeltaLengthDecodingSpacedByteArray)->Apply(ByteArrayCustomArguments); | ||
|
|
||
| void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, | ||
| int min_size, int max_size, double prefixed_probability) { | ||
| std::default_random_engine gen(seed); | ||
| std::uniform_int_distribution<int> dist_size(min_size, max_size); | ||
| std::uniform_int_distribution<int> dist_byte(0, 255); | ||
| std::bernoulli_distribution dist_has_prefix(prefixed_probability); | ||
| std::uniform_real_distribution<double> dist_prefix_length(0, 1); | ||
|
|
||
| for (int i = 0; i < n; ++i) { | ||
| int len = dist_size(gen); | ||
| out[i].len = len; | ||
| out[i].ptr = buf; | ||
|
|
||
| bool do_prefix = dist_has_prefix(gen) && i > 0; | ||
| int prefix_len = 0; | ||
| if (do_prefix) { | ||
| int max_prefix_len = std::min(len, static_cast<int>(out[i - 1].len)); | ||
| prefix_len = static_cast<int>(std::ceil(max_prefix_len * dist_prefix_length(gen))); | ||
| } | ||
| for (int j = 0; j < prefix_len; ++j) { | ||
| buf[j] = out[i - 1].ptr[j]; | ||
| } | ||
| for (int j = prefix_len; j < len; ++j) { | ||
| buf[j] = static_cast<uint8_t>(dist_byte(gen)); | ||
| } | ||
| buf += len; | ||
| } | ||
| } | ||
|
|
||
| static void BM_DeltaEncodingByteArray(benchmark::State& state) { | ||
| int32_t min_length = static_cast<int32_t>(state.range(0)); | ||
| int32_t max_length = static_cast<int32_t>(state.range(1)); | ||
| int32_t array_size = static_cast<int32_t>(state.range(2)); | ||
| double prefixed_probability = state.range(3) / 100; | ||
| auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY); | ||
| std::vector<ByteArray> values; | ||
| std::vector<uint8_t> buf(max_length * array_size); | ||
| values.resize(array_size); | ||
| prefixed_random_byte_array(array_size, /*seed=*/0, buf.data(), values.data(), | ||
|
||
| min_length, max_length, | ||
| /*prefixed_probability=*/prefixed_probability); | ||
| int64_t actual_length = 0; | ||
| for (auto v : values) { | ||
| actual_length += v.len; | ||
| } | ||
|
|
||
| for (auto _ : state) { | ||
| encoder->Put(values.data(), static_cast<int>(values.size())); | ||
| encoder->FlushValues(); | ||
| } | ||
| state.SetItemsProcessed(state.iterations() * array_size); | ||
| state.SetBytesProcessed(state.iterations() * actual_length); | ||
| } | ||
|
|
||
| static void BM_DeltaDecodingByteArray(benchmark::State& state) { | ||
| int32_t min_length = static_cast<int32_t>(state.range(0)); | ||
| int32_t max_length = static_cast<int32_t>(state.range(1)); | ||
| int32_t array_size = static_cast<int32_t>(state.range(2)); | ||
| double prefixed_probability = state.range(3) / 100; | ||
| auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY); | ||
| std::vector<ByteArray> values; | ||
| std::vector<uint8_t> input_buf(max_length * array_size); | ||
| values.resize(array_size); | ||
| prefixed_random_byte_array(array_size, /*seed=*/0, input_buf.data(), values.data(), | ||
| min_length, max_length, | ||
| /*prefixed_probability=*/prefixed_probability); | ||
| int64_t actual_length = 0; | ||
| for (auto v : values) { | ||
| actual_length += v.len; | ||
| } | ||
| encoder->Put(values.data(), static_cast<int>(values.size())); | ||
| std::shared_ptr<Buffer> buf = encoder->FlushValues(); | ||
|
|
||
| auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY); | ||
| for (auto _ : state) { | ||
| decoder->SetData(array_size, buf->data(), static_cast<int>(buf->size())); | ||
| decoder->Decode(values.data(), static_cast<int>(values.size())); | ||
| ::benchmark::DoNotOptimize(values); | ||
| } | ||
| state.SetItemsProcessed(state.iterations() * array_size); | ||
| state.SetBytesProcessed(state.iterations() * actual_length); | ||
| } | ||
|
|
||
| static void ByteArrayDeltaCustomArguments(benchmark::internal::Benchmark* b) { | ||
| for (int max_string_length : {8, 64, 1024}) { | ||
| for (int batch_size : {512, 2048}) { | ||
| std::vector<std::pair<int, int>> prefix_gen_params = { | ||
| {10, 0}, {90, max_string_length / 2}, {99, max_string_length}}; | ||
| for (auto& [prefixed_probability, min_prefix_string_length] : prefix_gen_params) { | ||
| b->Args({min_prefix_string_length, max_string_length, batch_size, | ||
| prefixed_probability}); | ||
| } | ||
| } | ||
| } | ||
| b->ArgNames({"min-prefix-string-length", "max-string-length", "batch-size", | ||
| "prefixed-probability"}); | ||
| } | ||
|
|
||
| BENCHMARK(BM_DeltaEncodingByteArray)->Apply(ByteArrayDeltaCustomArguments); | ||
| BENCHMARK(BM_DeltaDecodingByteArray)->Apply(ByteArrayDeltaCustomArguments); | ||
|
|
||
| static void BM_RleEncodingBoolean(benchmark::State& state) { | ||
| std::vector<bool> values(state.range(0), true); | ||
| auto encoder = MakeEncoder(Type::BOOLEAN, Encoding::RLE); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this desirable? If decoding first a large page and then a small page, this means that memory wouldn't be released. Does page size always stay similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emmm Personally I don't think the page will be too large, but user might set large page-size, I'll remove that