Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion cpp/src/arrow/util/bit_stream_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class BitReader {
}

/// Gets the next value from the buffer. Returns true if 'v' could be read or false if
/// there are not enough bytes left. num_bits must be <= 32.
/// there are not enough bytes left.
template <typename T>
bool GetValue(int num_bits, T* v);

Expand All @@ -157,6 +157,10 @@ class BitReader {
template <typename T>
bool GetAligned(int num_bytes, T* v);

/// Advances the stream by a number of bits. Returns true if succeed or false if there
/// are not enough bits left.
bool Advance(int64_t num_bits);

/// Reads a vlq encoded int from the stream. The encoded int must start at
/// the beginning of a byte. Return false if there were not enough bytes in
/// the buffer.
Expand Down Expand Up @@ -423,6 +427,25 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) {
return true;
}

inline bool BitReader::Advance(int64_t num_bits) {
int64_t bits_required = bit_offset_ + num_bits;
int bytes_required = static_cast<int>(BitUtil::BytesForBits(bits_required));
if (ARROW_PREDICT_FALSE(byte_offset_ + bytes_required > max_bytes_)) {
return false;
}
byte_offset_ += static_cast<int>(bits_required >> 3);
bit_offset_ = static_cast<int>(bits_required & 7);
// Reset buffered_values_
int bytes_remaining = max_bytes_ - byte_offset_;
if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) {
memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
} else {
memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
}
buffered_values_ = arrow::BitUtil::FromLittleEndian(buffered_values_);
return true;
}

inline bool BitWriter::PutVlqInt(uint32_t v) {
bool result = true;
while ((v & 0xFFFFFF80UL) != 0UL) {
Expand Down
38 changes: 38 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4192,10 +4192,48 @@ TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) {

::arrow::AssertTablesEqual(*table, *expect_table);
}

TEST(TestArrowReadDeltaEncoding, DeltaLengthByteArray) {
auto file = test::get_data_file("delta_byte_array.parquet");
auto expect_file = test::get_data_file("delta_byte_array_expect.csv");
auto pool = ::arrow::default_memory_pool();
std::unique_ptr<FileReader> parquet_reader;
std::shared_ptr<::arrow::Table> parquet_table;
ASSERT_OK(
FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), &parquet_reader));
ASSERT_OK(parquet_reader->ReadTable(&parquet_table));
ASSERT_OK_AND_ASSIGN(auto actural_table, parquet_table->CombineChunks());

ASSERT_OK_AND_ASSIGN(auto input_file, ::arrow::io::ReadableFile::Open(expect_file));
auto convert_options = ::arrow::csv::ConvertOptions::Defaults();
std::array<std::string, 12> column_names = {
"c_customer_id", "c_salutation", "c_first_name",
"c_last_name", "c_preferred_cust_flag", "c_birth_country",
"c_login", "c_email_address", "c_last_review_date"};
for (auto name : column_names) {
convert_options.column_types[name] = ::arrow::utf8();
}
convert_options.strings_can_be_null = true;
ASSERT_OK_AND_ASSIGN(auto csv_reader,
::arrow::csv::TableReader::Make(
::arrow::io::default_io_context(), input_file,
::arrow::csv::ReadOptions::Defaults(),
::arrow::csv::ParseOptions::Defaults(), convert_options));
ASSERT_OK_AND_ASSIGN(auto csv_table, csv_reader->Read());
ASSERT_OK_AND_ASSIGN(auto expect_table, csv_table->CombineChunks());

::arrow::AssertTablesEqual(*actural_table, *expect_table);
}

#else
TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) {
GTEST_SKIP() << "Test needs CSV reader";
}

TEST(TestArrowReadDeltaEncoding, DeltaLengthByteArray) {
GTEST_SKIP() << "Test needs CSV reader";
}

#endif

} // namespace arrow
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,13 @@ class ColumnReaderImplBase {
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::DELTA_BYTE_ARRAY: {
auto decoder = MakeTypedDecoder<DType>(Encoding::DELTA_BYTE_ARRAY, descr_);
current_decoder_ = decoder.get();
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");

default:
Expand Down
Loading