-
Notifications
You must be signed in to change notification settings - Fork 4.1k
GH-37293: [C++][Parquet] Encoding: Add Benchmark for DELTA_BYTE_ARRAY #37641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
c99d49d
4feb5c0
d9825d2
c567ed6
9a8d100
e8e8766
557d0ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -3300,14 +3300,19 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode | |||||
|
|
||||||
| void SetData(int num_values, const uint8_t* data, int len) override { | ||||||
| num_values_ = num_values; | ||||||
| decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len); | ||||||
| if (decoder_) { | ||||||
| decoder_->Reset(data, len); | ||||||
| } else { | ||||||
| decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len); | ||||||
| } | ||||||
| prefix_len_decoder_.SetDecoder(num_values, decoder_); | ||||||
|
|
||||||
| // get the number of encoded prefix lengths | ||||||
| int num_prefix = prefix_len_decoder_.ValidValuesCount(); | ||||||
| // call prefix_len_decoder_.Decode to decode all the prefix lengths. | ||||||
| // all the prefix lengths are buffered in buffered_prefix_length_. | ||||||
| PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix * sizeof(int32_t))); | ||||||
| PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix * sizeof(int32_t), | ||||||
| /*shrink_to_fit=*/false)); | ||||||
| int ret = prefix_len_decoder_.Decode( | ||||||
| reinterpret_cast<int32_t*>(buffered_prefix_length_->mutable_data()), num_prefix); | ||||||
| DCHECK_EQ(ret, num_prefix); | ||||||
|
|
@@ -3323,7 +3328,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode | |||||
|
|
||||||
| // TODO: read corrupted files written with bug(PARQUET-246). last_value_ should be set | ||||||
| // to last_value_in_previous_page_ when decoding a new page(except the first page) | ||||||
| last_value_ = ""; | ||||||
| last_value_.clear(); | ||||||
| } | ||||||
|
|
||||||
| int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, | ||||||
|
|
@@ -3370,20 +3375,23 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public TypedDecode | |||||
| throw ParquetException("excess expansion in DELTA_BYTE_ARRAY"); | ||||||
| } | ||||||
| } | ||||||
| PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size)); | ||||||
| PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size, false)); | ||||||
|
||||||
| PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size, false)); | |
| PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size, /*shrink_to_fit=*/false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, we lose the chance to release bulk memory if shrink_to_fit is false. Changing the default behavior of buffered_prefix_length_ above may not make big difference but the actual size of buffered_data_ may vary a lot during the decoding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me leave it first, maybe we need a better method to release memory?
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an (maybe-unused) optimization. When prefix == 0, it avoid round of memcpy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make benchmarks better or worse?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better, of course, specially when so many prefix == 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pitrou I can separate a pr for this. I think we can optimize two cases:
- Prefix == 0
- Posfix == 0
Each of the case can be well optimize to avoid copying and memory allocation. I can separate for that
wgtmac marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -737,6 +737,90 @@ static void BM_DeltaLengthDecodingSpacedByteArray(benchmark::State& state) { | |
| BENCHMARK(BM_PlainDecodingSpacedByteArray)->Apply(ByteArrayCustomArguments); | ||
| BENCHMARK(BM_DeltaLengthDecodingSpacedByteArray)->Apply(ByteArrayCustomArguments); | ||
|
|
||
| void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, | ||
| int min_size, int max_size, double prefixed_probability) { | ||
| std::default_random_engine gen(seed); | ||
| std::uniform_int_distribution<int> dist_size(min_size, max_size); | ||
| std::uniform_int_distribution<int> dist_byte(0, 255); | ||
| std::bernoulli_distribution dist_has_prefix(prefixed_probability); | ||
| std::uniform_real_distribution<double> dist_prefix_length(0, 1); | ||
|
|
||
| for (int i = 0; i < n; ++i) { | ||
| int len = dist_size(gen); | ||
| out[i].len = len; | ||
| out[i].ptr = buf; | ||
|
|
||
| bool do_prefix = dist_has_prefix(gen) && i > 0; | ||
| int prefix_len = 0; | ||
| if (do_prefix) { | ||
| int max_prefix_len = std::min(len, static_cast<int>(out[i - 1].len)); | ||
| prefix_len = static_cast<int>(std::ceil(max_prefix_len * dist_prefix_length(gen))); | ||
| } | ||
| for (int j = 0; j < prefix_len; ++j) { | ||
| buf[j] = out[i - 1].ptr[j]; | ||
| } | ||
| for (int j = prefix_len; j < len; ++j) { | ||
| buf[j] = static_cast<uint8_t>(dist_byte(gen)); | ||
| } | ||
| buf += len; | ||
| } | ||
| } | ||
|
|
||
| static void BM_DeltaEncodingByteArray(benchmark::State& state) { | ||
| // Using arrow generator to generate random data. | ||
| int32_t max_length = static_cast<int32_t>(state.range(0)); | ||
| int32_t array_size = static_cast<int32_t>(state.range(1)); | ||
| auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY); | ||
| std::vector<ByteArray> values; | ||
| std::vector<uint8_t> buf(max_length * array_size); | ||
| values.resize(array_size); | ||
| prefixed_random_byte_array(array_size, /*seed=*/0, buf.data(), values.data(), | ||
|
||
| /*min_size=*/0, max_length, | ||
| /*prefixed_probability=*/0.5); | ||
| int64_t actual_length = 0; | ||
| for (auto v : values) { | ||
| actual_length += v.len; | ||
| } | ||
|
|
||
| for (auto _ : state) { | ||
| encoder->Put(values.data(), static_cast<int>(values.size())); | ||
| encoder->FlushValues(); | ||
| } | ||
| state.SetItemsProcessed(state.iterations() * array_size); | ||
| state.SetBytesProcessed(state.iterations() * actual_length); | ||
| } | ||
|
|
||
| static void BM_DeltaDecodingByteArray(benchmark::State& state) { | ||
| // Using arrow generator to generate random data. | ||
| int32_t max_length = static_cast<int32_t>(state.range(0)); | ||
| int32_t array_size = static_cast<int32_t>(state.range(1)); | ||
| auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY); | ||
| std::vector<ByteArray> values; | ||
| std::vector<uint8_t> input_buf(max_length * array_size); | ||
| values.resize(array_size); | ||
| prefixed_random_byte_array(array_size, /*seed=*/0, input_buf.data(), values.data(), | ||
| /*min_size=*/0, max_length, | ||
| /*prefixed_probability=*/0.5); | ||
| int64_t actual_length = 0; | ||
| for (auto v : values) { | ||
| actual_length += v.len; | ||
| } | ||
| encoder->Put(values.data(), static_cast<int>(values.size())); | ||
| std::shared_ptr<Buffer> buf = encoder->FlushValues(); | ||
|
|
||
| auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY); | ||
| for (auto _ : state) { | ||
| decoder->SetData(array_size, buf->data(), static_cast<int>(buf->size())); | ||
| decoder->Decode(values.data(), static_cast<int>(values.size())); | ||
| ::benchmark::DoNotOptimize(values); | ||
| } | ||
| state.SetItemsProcessed(state.iterations() * array_size); | ||
| state.SetBytesProcessed(state.iterations() * actual_length); | ||
| } | ||
|
|
||
| BENCHMARK(BM_DeltaEncodingByteArray)->Apply(ByteArrayCustomArguments); | ||
| BENCHMARK(BM_DeltaDecodingByteArray)->Apply(ByteArrayCustomArguments); | ||
|
|
||
| static void BM_RleEncodingBoolean(benchmark::State& state) { | ||
| std::vector<bool> values(state.range(0), true); | ||
| auto encoder = MakeEncoder(Type::BOOLEAN, Encoding::RLE); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this desirable? If decoding first a large page and then a small page, this means that memory wouldn't be released. Does page size always stay similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emmm Personally I don't think the page will be too large, but user might set large page-size, I'll remove that