Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions cpp/src/parquet/column_io_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "arrow/io/memory.h"
#include "arrow/testing/random.h"
#include "arrow/util/config.h"
#include "arrow/util/logging.h"

#include "parquet/column_reader.h"
#include "parquet/column_writer.h"
Expand Down Expand Up @@ -167,27 +168,29 @@ std::shared_ptr<Int64Reader> BuildReader(std::shared_ptr<Buffer>& buffer,

static void BM_ReadInt64Column(::benchmark::State& state, Repetition::type repetition,
Compression::type codec, Encoding::type encoding) {
format::ColumnChunk thrift_metadata;
const auto kNumValues = state.range(0);
const auto kBatchSize = state.range(1);

::arrow::random::RandomArrayGenerator rgen(1337);
auto values = rgen.Int64(state.range(0), 0, 1000000, 0);
auto values = rgen.Int64(kNumValues, 0, 1000000, 0);
const auto& int64_values = static_cast<const ::arrow::Int64Array&>(*values);

std::vector<int16_t> definition_levels(state.range(0), 1);
std::vector<int16_t> repetition_levels(state.range(0), 0);
std::vector<int16_t> definition_levels(kNumValues, 1);
std::vector<int16_t> repetition_levels(kNumValues, 0);
std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
std::shared_ptr<WriterProperties> properties = WriterProperties::Builder()
.compression(codec)
->encoding(encoding)
->disable_dictionary()
->build();

format::ColumnChunk thrift_metadata;
auto metadata = ColumnChunkMetaDataBuilder::Make(
properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));

auto stream = CreateOutputStream();
std::shared_ptr<Int64Writer> writer = BuildWriter(
state.range(0), stream, metadata.get(), schema.get(), properties.get(), codec);
kNumValues, stream, metadata.get(), schema.get(), properties.get(), codec);
writer->WriteBatch(int64_values.length(), definition_levels.data(),
repetition_levels.data(), int64_values.raw_values());
writer->Close();
Expand All @@ -196,16 +199,17 @@ static void BM_ReadInt64Column(::benchmark::State& state, Repetition::type repet
int64_t stream_size = src->size();
int64_t data_size = int64_values.length() * sizeof(int64_t);

std::vector<int64_t> values_out(state.range(1));
std::vector<int16_t> definition_levels_out(state.range(1));
std::vector<int16_t> repetition_levels_out(state.range(1));
std::vector<int64_t> values_out(kBatchSize);
std::vector<int16_t> definition_levels_out(kBatchSize);
std::vector<int16_t> repetition_levels_out(kBatchSize);
while (state.KeepRunning()) {
std::shared_ptr<Int64Reader> reader =
BuildReader(src, state.range(1), codec, schema.get());
BuildReader(src, kNumValues, codec, schema.get());
Comment on lines 206 to +207
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix: we were instantiating the ColumnReader with the read batch size (state.range(1)) instead of the total number of values (state.range(0)).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

int64_t values_read = 0;
for (int64_t i = 0; i < int64_values.length(); i += values_read) {
reader->ReadBatch(values_out.size(), definition_levels_out.data(),
for (int64_t i = 0; i < kNumValues; i += values_read) {
reader->ReadBatch(kBatchSize, definition_levels_out.data(),
repetition_levels_out.data(), values_out.data(), &values_read);
ARROW_CHECK_NE(values_read, 0) << "Unexpected end of column";
}
}
SetBytesProcessed(state, repetition);
Expand Down
Loading