Skip to content

Commit 086d826

Browse files
[scan-opt-2] Optimize decode function and refine code (apache#157)
* Add parquet scan benchmark * Add Usage * perf report * Add Optimize append * Complete plaindecoder code and passed test * Add code for DictDecoder * Resume CMakeLists.txt * Fix offset validate * reduce buffer capacity * Add Patch version * Add Fix for write validate * Set false for resize * Fix customer case issue * Add patch version * Remove cout * Add Function opt * Add to DecodeArrowDense_opt * clean comment * Clean unnecessary paras * Delete Patch version * Modify Gbenchmark version to avoid conflict * comment cout
1 parent 48029b9 commit 086d826

File tree

7 files changed

+28
-91
lines changed

7 files changed

+28
-91
lines changed

cpp/src/parquet/arrow/reader.cc

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,6 @@ class FileReaderImpl : public FileReader {
142142
: pool_(pool),
143143
reader_(std::move(reader)),
144144
reader_properties_(std::move(properties)) {}
145-
146-
~FileReaderImpl() {
147-
std::cout << "Patch version-0830" << std::endl;
148-
}
149145

150146
Status Init() {
151147
return SchemaManifest::Make(reader_->metadata()->schema(),

cpp/src/parquet/arrow/reader_internal.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,10 @@ Status TransferColumnData(RecordReader* reader, std::shared_ptr<DataType> value_
704704
case ::arrow::Type::DATE64:
705705
RETURN_NOT_OK(TransferDate64(reader, pool, value_type, &result));
706706
break;
707-
case ::arrow::Type::FIXED_SIZE_BINARY:
707+
case ::arrow::Type::FIXED_SIZE_BINARY: {
708+
RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result));
709+
result = chunked_result;
710+
} break;
708711
case ::arrow::Type::BINARY:
709712
case ::arrow::Type::STRING:
710713
case ::arrow::Type::LARGE_BINARY:

cpp/src/parquet/column_reader.cc

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1570,39 +1570,27 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
15701570
}
15711571

15721572
void ReadValuesDense(int64_t values_to_read) override {
1573-
// int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(
1574-
// static_cast<int>(values_to_read), &accumulator_);
15751573
int64_t num_decoded = this->current_decoder_->DecodeArrow_opt(
15761574
static_cast<int>(values_to_read), 0,
15771575
NULLPTR, (reinterpret_cast<int32_t *>(offset_->mutable_data()) + values_written_),
1578-
values_, 0, &accumulator_, &bianry_length_);
1576+
values_, 0, &bianry_length_);
15791577
DCHECK_EQ(num_decoded, values_to_read);
1580-
// ResetValues();
15811578
}
15821579

1583-
// void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
1584-
// int64_t num_decoded = this->current_decoder_->DecodeArrow(
1585-
// static_cast<int>(values_to_read), static_cast<int>(null_count),
1586-
// valid_bits_->mutable_data(), values_written_, &accumulator_);
1587-
// DCHECK_EQ(num_decoded, values_to_read - null_count);
1588-
// ResetValues();
1589-
// }
1590-
15911580
void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
15921581
int64_t num_decoded = this->current_decoder_->DecodeArrow_opt(
15931582
static_cast<int>(values_to_read), static_cast<int>(null_count),
15941583
valid_bits_->mutable_data(), (reinterpret_cast<int32_t *>(offset_->mutable_data()) + values_written_),
1595-
values_, values_written_, &accumulator_, &bianry_length_);
1584+
values_, values_written_, &bianry_length_);
15961585
DCHECK_EQ(num_decoded, values_to_read - null_count);
1597-
// ResetValues();
15981586
}
15991587

16001588
void ReserveValues(int64_t extra_values) {
16011589
const int64_t new_values_capacity =
16021590
UpdateCapacity(values_capacity_, values_written_, extra_values);
16031591
if (new_values_capacity > values_capacity_) {
16041592
PARQUET_THROW_NOT_OK(
1605-
values_->Resize(new_values_capacity * 20, false));
1593+
values_->Resize(new_values_capacity * binary_per_row_length_, false));
16061594
PARQUET_THROW_NOT_OK(
16071595
offset_->Resize((new_values_capacity+1) * 4, false));
16081596

@@ -1626,7 +1614,6 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
16261614

16271615
std::shared_ptr<ResizableBuffer> ReleaseValues() override {
16281616
auto result = values_;
1629-
// PARQUET_THROW_NOT_OK(result->Resize(bytes_for_values(values_written_), true));
16301617
values_ = AllocateBuffer(this->pool_);
16311618
values_capacity_ = 0;
16321619
return result;
@@ -1639,8 +1626,13 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
16391626
const auto first_offset = offsetArr[0];
16401627
const auto last_offset = offsetArr[values_written_];
16411628
int64_t binary_length = last_offset - first_offset;
1642-
// std::cout << "binary_length:" << binary_length << std::endl;
16431629
values_->SetSize(binary_length);
1630+
1631+
if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) {
1632+
binary_per_row_length_ = binary_length / values_written_ + 1;
1633+
// std::cout << "binary_per_row_length_:" << binary_per_row_length_ << std::endl;
1634+
hasCal_average_len_ = true;
1635+
}
16441636

16451637
offset_ = AllocateBuffer(this->pool_);
16461638
bianry_length_ = 0;
@@ -1667,9 +1659,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
16671659

16681660
int32_t bianry_length_ = 0;
16691661

1670-
// std::shared_ptr<::arrow::ResizableBuffer> values_;
16711662
std::shared_ptr<::arrow::ResizableBuffer> offset_;
1672-
// std::shared_ptr<::arrow::ResizableBuffer> valid_bits_;
16731663
};
16741664

16751665
class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,

cpp/src/parquet/column_reader.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
5454
// 16 KB is the default expected page header size
5555
static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
5656

57+
static constexpr int32_t kDefaultBinaryPerRowSzie = 20;
58+
5759
class PARQUET_EXPORT LevelDecoder {
5860
public:
5961
LevelDecoder();
@@ -301,6 +303,9 @@ class RecordReader {
301303
int64_t levels_position_;
302304
int64_t levels_capacity_;
303305

306+
bool hasCal_average_len_ = false;
307+
int64_t binary_per_row_length_ = kDefaultBinaryPerRowSzie;
308+
304309
std::shared_ptr<::arrow::ResizableBuffer> values_;
305310
// In the case of false, don't allocate the values buffer (when we directly read into
306311
// builder classes).

cpp/src/parquet/encoding.cc

Lines changed: 9 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1356,15 +1356,11 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
13561356
int32_t* offset,
13571357
std::shared_ptr<::arrow::ResizableBuffer> & values,
13581358
int64_t valid_bits_offset,
1359-
typename EncodingTraits<ByteArrayType>::Accumulator* out,
13601359
int32_t* bianry_length) {
13611360
int result = 0;
13621361
PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits,
13631362
offset, values,
1364-
valid_bits_offset, out, &result, bianry_length));
1365-
1366-
// PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
1367-
// valid_bits_offset, out, &result));
1363+
valid_bits_offset, &result, bianry_length));
13681364

13691365
return result;
13701366
}
@@ -1428,27 +1424,16 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
14281424
int32_t* offset,
14291425
std::shared_ptr<::arrow::ResizableBuffer> & values,
14301426
int64_t valid_bits_offset,
1431-
typename EncodingTraits<ByteArrayType>::Accumulator* out,
14321427
int* out_values_decoded,
14331428
int32_t* bianry_length) {
1434-
// ArrowBinaryHelper helper(out);
14351429
int values_decoded = 0;
1436-
1437-
1438-
1439-
// RETURN_NOT_OK(helper.builder->Reserve(num_values));
1440-
// RETURN_NOT_OK(helper.builder->ReserveData(
1441-
// std::min<int64_t>(len_, helper.chunk_space_remaining)));
1442-
14431430
auto dst_value = values->mutable_data() + (*bianry_length);
14441431
int capacity = values->capacity();
14451432
if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) {
14461433
values->Reserve(len_ + *bianry_length);
14471434
dst_value = values->mutable_data() + (*bianry_length);
14481435
}
14491436

1450-
1451-
14521437
int i = 0;
14531438
RETURN_NOT_OK(VisitNullBitmapInline(
14541439
valid_bits, valid_bits_offset, num_values, null_count,
@@ -1464,37 +1449,19 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
14641449
if (ARROW_PREDICT_FALSE(len_ < increment)) {
14651450
ParquetException::EofException();
14661451
}
1467-
// if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) {
1468-
// // This element would exceed the capacity of a chunk
1469-
// RETURN_NOT_OK(helper.PushChunk());
1470-
// RETURN_NOT_OK(helper.builder->Reserve(num_values - i));
1471-
// RETURN_NOT_OK(helper.builder->ReserveData(
1472-
// std::min<int64_t>(len_, helper.chunk_space_remaining)));
1473-
// }
1474-
// helper.UnsafeAppend(data_ + 4, value_len);
14751452

14761453
(*bianry_length) += value_len;
14771454
offset[i+1] = offset[i] + value_len;
14781455
memcpy(dst_value, data_ + 4, value_len);
14791456
dst_value = dst_value + value_len;
14801457

1481-
// std::cout << "*(data_ + 4) :" << *(data_ + 4) << std::endl;
1482-
// std::cout << "*(data_ + 5) " << *(data_ + 5) << std::endl;
1483-
14841458
data_ += increment;
14851459
len_ -= increment;
1486-
1487-
// uint8_t* address = values->mutable_data();
1488-
// for(int i=0; i< 10; i++) {
1489-
// std::cout << "*(address+" << i << ")" << *(address+i) << std::endl;
1490-
// }
1491-
14921460
++values_decoded;
14931461
++i;
14941462
return Status::OK();
14951463
},
14961464
[&]() {
1497-
// helper.UnsafeAppendNull();
14981465
offset[i+1] = offset[i];
14991466
++i;
15001467
return Status::OK();
@@ -1962,23 +1929,18 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
19621929
int32_t* offset,
19631930
std::shared_ptr<::arrow::ResizableBuffer> & values,
19641931
int64_t valid_bits_offset,
1965-
typename EncodingTraits<ByteArrayType>::Accumulator* out,
19661932
int32_t* bianry_length) {
19671933
int result = 0;
19681934
if (null_count == 0) {
19691935
PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull_opt(num_values,
1970-
offset, values,
1971-
out, &result, bianry_length));
1936+
offset, values, &result, bianry_length));
19721937
} else {
19731938
PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits,
19741939
offset, values,
1975-
valid_bits_offset, out, &result, bianry_length));
1940+
valid_bits_offset, &result, bianry_length));
19761941

19771942
}
19781943

1979-
// PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
1980-
// valid_bits_offset, out, &result));
1981-
19821944
return result;
19831945
}
19841946

@@ -2051,23 +2013,18 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
20512013
int32_t* offset,
20522014
std::shared_ptr<::arrow::ResizableBuffer> & values,
20532015
int64_t valid_bits_offset,
2054-
typename EncodingTraits<ByteArrayType>::Accumulator* out,
20552016
int* out_num_values,
20562017
int32_t* bianry_length) {
20572018
constexpr int32_t kBufferSize = 1024;
20582019
int32_t indices[kBufferSize];
2059-
2060-
// ArrowBinaryHelper helper(out);
2061-
20622020
auto dst_value = values->mutable_data() + (*bianry_length);
20632021

2064-
2065-
20662022
::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
20672023

20682024
auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
20692025
int values_decoded = 0;
20702026
int num_appended = 0;
2027+
uint64_t capacity = values->capacity();
20712028
while (num_appended < num_values) {
20722029
bool is_valid = bit_reader.IsSet();
20732030
bit_reader.Next();
@@ -2086,16 +2043,11 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
20862043
// Consume all indices
20872044
if (is_valid) {
20882045
auto idx = indices[i];
2089-
RETURN_NOT_OK(IndexInBounds(idx));
2090-
const auto& val = dict_values[idx];
2091-
// if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
2092-
// RETURN_NOT_OK(helper.PushChunk());
2093-
// }
2094-
// RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
2095-
2046+
// RETURN_NOT_OK(IndexInBounds(idx));
2047+
const auto& val = dict_values[idx];
20962048
auto value_len = val.len;
20972049
auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len;
2098-
uint64_t capacity = values->capacity();
2050+
20992051
if (ARROW_PREDICT_FALSE(value_offset >= capacity)) {
21002052
capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len);
21012053
values->Reserve(capacity);
@@ -2109,7 +2061,6 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
21092061
++i;
21102062
++values_decoded;
21112063
} else {
2112-
// RETURN_NOT_OK(helper.AppendNull());
21132064
offset[num_appended+1] = offset[num_appended];
21142065
--null_count;
21152066
}
@@ -2123,7 +2074,6 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
21232074
bit_reader.Next();
21242075
}
21252076
} else {
2126-
// RETURN_NOT_OK(helper.AppendNull());
21272077
offset[num_appended+1] = offset[num_appended];
21282078
--null_count;
21292079
++num_appended;
@@ -2165,13 +2115,13 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
21652115
Status DecodeArrowDenseNonNull_opt(int num_values,
21662116
int32_t* offset,
21672117
std::shared_ptr<::arrow::ResizableBuffer> & values,
2168-
typename EncodingTraits<ByteArrayType>::Accumulator* out,
21692118
int* out_num_values,
21702119
int32_t* bianry_length) {
21712120

21722121
constexpr int32_t kBufferSize = 2048;
21732122
int32_t indices[kBufferSize];
21742123
int values_decoded = 0;
2124+
uint64_t capacity = values->capacity();
21752125

21762126
// ArrowBinaryHelper helper(out);
21772127
auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
@@ -2185,16 +2135,10 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
21852135
if (num_indices == 0) ParquetException::EofException();
21862136
for (int i = 0; i < num_indices; ++i) {
21872137
auto idx = indices[i];
2188-
RETURN_NOT_OK(IndexInBounds(idx));
2138+
// RETURN_NOT_OK(IndexInBounds(idx));
21892139
const auto& val = dict_values[idx];
2190-
// if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
2191-
// RETURN_NOT_OK(helper.PushChunk());
2192-
// }
2193-
// RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
2194-
21952140
auto value_len = val.len;
21962141
auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len;
2197-
uint64_t capacity = values->capacity();
21982142
if (ARROW_PREDICT_FALSE(value_offset >= capacity)) {
21992143
capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len);
22002144
values->Reserve(capacity);

cpp/src/parquet/encoding.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,6 @@ class TypedDecoder : virtual public Decoder {
321321
int32_t* offset,
322322
std::shared_ptr<::arrow::ResizableBuffer> & values,
323323
int64_t valid_bits_offset,
324-
typename EncodingTraits<ByteArrayType>::Accumulator* out,
325324
int32_t* bianry_length) {
326325
return 0;
327326
}

cpp/thirdparty/versions.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ ARROW_BOOST_BUILD_VERSION=1.75.0
3333
ARROW_BROTLI_BUILD_VERSION=v1.0.9
3434
ARROW_BZIP2_BUILD_VERSION=1.0.8
3535
ARROW_CARES_BUILD_VERSION=1.17.1
36-
ARROW_GBENCHMARK_BUILD_VERSION=v1.5.2
36+
ARROW_GBENCHMARK_BUILD_VERSION=v1.6.0
3737
ARROW_GFLAGS_BUILD_VERSION=v2.2.2
3838
ARROW_GLOG_BUILD_VERSION=v0.4.0
3939
ARROW_GRPC_BUILD_VERSION=v1.35.0

0 commit comments

Comments
 (0)