Skip to content
97 changes: 90 additions & 7 deletions cpp/src/arrow/util/bit_stream_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#pragma once

#include <string.h>

#include <algorithm>
#include <cstdint>

Expand Down Expand Up @@ -77,6 +78,15 @@ class BitWriter {
// Writes an int zigzag encoded.
bool PutZigZagVlqInt(int32_t v);

/// Write a Vlq encoded int64 to the buffer. Returns false if there was not enough
/// room. The value is written byte aligned.
/// For more details on vlq:
/// en.wikipedia.org/wiki/Variable-length_quantity
bool PutVlqInt(uint64_t v);

// Writes an int64 zigzag encoded.
bool PutZigZagVlqInt(int64_t v);

/// Get a pointer to the next aligned byte and advance the underlying buffer
/// by num_bytes.
/// Returns NULL if there was not enough space.
Expand Down Expand Up @@ -155,6 +165,14 @@ class BitReader {
// Reads a zigzag encoded int `into` v.
bool GetZigZagVlqInt(int32_t* v);

/// Reads a vlq encoded int64 from the stream. The encoded int must start at
/// the beginning of a byte. Return false if there were not enough bytes in
/// the buffer.
bool GetVlqInt(uint64_t* v);

// Reads a zigzag encoded int64 `into` v.
bool GetZigZagVlqInt(int64_t* v);

/// Returns the number of bytes left in the stream, not including the current
/// byte (i.e., there may be an additional fraction of a byte).
int bytes_left() {
Expand All @@ -165,6 +183,9 @@ class BitReader {
/// Maximum byte length of a vlq encoded int
static constexpr int kMaxVlqByteLength = 5;

/// Maximum byte length of a vlq encoded int64
static constexpr int kMaxVlqByteLengthForInt64 = 10;

private:
const uint8_t* buffer_;
int max_bytes_;
Expand Down Expand Up @@ -263,8 +284,10 @@ inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer,
#pragma warning(disable : 4800 4805)
#endif
// Read bits of v that crossed into new buffered_values_
*v = *v | static_cast<T>(BitUtil::TrailingBits(*buffered_values, *bit_offset)
<< (num_bits - *bit_offset));
if (ARROW_PREDICT_TRUE(num_bits - *bit_offset < 64)) {
*v = *v | static_cast<T>(BitUtil::TrailingBits(*buffered_values, *bit_offset)
<< (num_bits - *bit_offset));
}
#ifdef _MSC_VER
#pragma warning(pop)
#endif
Expand All @@ -282,8 +305,6 @@ inline bool BitReader::GetValue(int num_bits, T* v) {
template <typename T>
inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) {
DCHECK(buffer_ != NULL);
// TODO: revisit this limit if necessary
DCHECK_LE(num_bits, 32);
DCHECK_LE(num_bits, static_cast<int>(sizeof(T) * 8));

int bit_offset = bit_offset_;
Expand Down Expand Up @@ -313,7 +334,18 @@ inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) {
reinterpret_cast<uint32_t*>(v + i), batch_size - i, num_bits);
i += num_unpacked;
byte_offset += num_unpacked * num_bits / 8;
} else if (sizeof(T) == 8 && num_bits > 32) {
// Use unpack64 only if num_bits is larger then 32
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't noticed previously, but why? Is it just a performance issue?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it is just a performance issue. If num_bits is no larger than 32, I guess that using unpack32 will achieve better performance with function such as unpack32_avx2/unpack32_avx512.
The result of using unpack32 or unpack64 are the same if num_bits <= 32

// TODO: improve the performance of internal::unpack64 and remove the restriction of
// num_bits
int num_unpacked =
internal::unpack64(buffer + byte_offset, reinterpret_cast<uint64_t*>(v + i),
batch_size - i, num_bits);
i += num_unpacked;
byte_offset += num_unpacked * num_bits / 8;
} else {
// TODO: revisit this limit if necessary
DCHECK_LE(num_bits, 32);
const int buffer_size = 1024;
uint32_t unpack_buffer[buffer_size];
while (i < batch_size) {
Expand Down Expand Up @@ -418,14 +450,65 @@ inline bool BitReader::GetVlqInt(uint32_t* v) {
}

inline bool BitWriter::PutZigZagVlqInt(int32_t v) {
auto u_v = ::arrow::util::SafeCopy<uint32_t>(v);
return PutVlqInt((u_v << 1) ^ (u_v >> 31));
uint32_t u_v = ::arrow::util::SafeCopy<uint32_t>(v);
v = (u_v << 1) ^ (v >> 31);
u_v = ::arrow::util::SafeCopy<uint32_t>(v);
return PutVlqInt(u_v);
}

inline bool BitReader::GetZigZagVlqInt(int32_t* v) {
uint32_t u;
if (!GetVlqInt(&u)) return false;
*v = ::arrow::util::SafeCopy<int32_t>((u >> 1) ^ (u << 31));
*v = ::arrow::util::SafeCopy<int32_t>(u);
int32_t temp = ::arrow::util::SafeCopy<int32_t>(u << 31);
temp = ((temp >> 31) ^ *v) >> 1;
*v = temp ^ (*v & (1 << 31));
return true;
}

inline bool BitWriter::PutVlqInt(uint64_t v) {
bool result = true;
while ((v & 0xFFFFFFFFFFFFFF80ULL) != 0ULL) {
result &= PutAligned<uint8_t>(static_cast<uint8_t>((v & 0x7F) | 0x80), 1);
v >>= 7;
}
result &= PutAligned<uint8_t>(static_cast<uint8_t>(v & 0x7F), 1);
return result;
}

inline bool BitReader::GetVlqInt(uint64_t* v) {
uint64_t tmp = 0;

for (int i = 0; i < kMaxVlqByteLengthForInt64; i++) {
uint8_t byte = 0;
if (ARROW_PREDICT_FALSE(!GetAligned<uint8_t>(1, &byte))) {
return false;
}
tmp |= static_cast<uint64_t>(byte & 0x7F) << (7 * i);

if ((byte & 0x80) == 0) {
*v = tmp;
return true;
}
}

return false;
}

inline bool BitWriter::PutZigZagVlqInt(int64_t v) {
uint64_t u_v = ::arrow::util::SafeCopy<uint64_t>(v);
v = (u_v << 1) ^ (v >> 63);
u_v = ::arrow::util::SafeCopy<uint64_t>(v);
return PutVlqInt(u_v);
}

inline bool BitReader::GetZigZagVlqInt(int64_t* v) {
uint64_t u;
if (!GetVlqInt(&u)) return false;
*v = ::arrow::util::SafeCopy<int64_t>(u);
int64_t temp = ::arrow::util::SafeCopy<int64_t>(u << 63);
temp = ((temp >> 63) ^ *v) >> 1;
*v = temp ^ (*v & (1LL << 63));
return true;
}

Expand Down
69 changes: 61 additions & 8 deletions cpp/src/arrow/util/bit_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1939,24 +1939,77 @@ TEST(BitUtil, RoundUpToPowerOf2) {
#undef U64
#undef S64

static void TestZigZag(int32_t v) {
static void TestZigZag(int32_t v, uint8_t* buffer_expect) {
uint8_t buffer[BitUtil::BitReader::kMaxVlqByteLength] = {};
BitUtil::BitWriter writer(buffer, sizeof(buffer));
BitUtil::BitReader reader(buffer, sizeof(buffer));
writer.PutZigZagVlqInt(v);
EXPECT_EQ(buffer_expect[0], buffer[0]);
EXPECT_EQ(buffer_expect[1], buffer[1]);
EXPECT_EQ(buffer_expect[2], buffer[2]);
EXPECT_EQ(buffer_expect[3], buffer[3]);
EXPECT_EQ(buffer_expect[4], buffer[4]);
int32_t result;
EXPECT_TRUE(reader.GetZigZagVlqInt(&result));
EXPECT_EQ(v, result);
}

TEST(BitStreamUtil, ZigZag) {
TestZigZag(0);
TestZigZag(1);
TestZigZag(1234);
TestZigZag(-1);
TestZigZag(-1234);
TestZigZag(std::numeric_limits<int32_t>::max());
TestZigZag(-std::numeric_limits<int32_t>::max());
uint8_t buffer_expect0[5] = {0, 0, 0, 0, 0};
uint8_t buffer_expect1[5] = {2, 0, 0, 0, 0};
uint8_t buffer_expect2[5] = {164, 19, 0, 0, 0};
uint8_t buffer_expect3[5] = {1, 0, 0, 0, 0};
uint8_t buffer_expect4[5] = {163, 19, 0, 0, 0};
uint8_t buffer_expect5[5] = {254, 255, 255, 255, 15};
uint8_t buffer_expect6[5] = {253, 255, 255, 255, 15};
uint8_t buffer_expect7[5] = {255, 255, 255, 255, 15};
TestZigZag(0, buffer_expect0);
TestZigZag(1, buffer_expect1);
TestZigZag(1234, buffer_expect2);
TestZigZag(-1, buffer_expect3);
TestZigZag(-1234, buffer_expect4);
TestZigZag(std::numeric_limits<int32_t>::max(), buffer_expect5);
TestZigZag(-std::numeric_limits<int32_t>::max(), buffer_expect6);
TestZigZag(std::numeric_limits<int32_t>::min(), buffer_expect7);
}

static void TestZigZag64(int64_t v, uint8_t* buffer_expect) {
uint8_t buffer[BitUtil::BitReader::kMaxVlqByteLengthForInt64] = {};
BitUtil::BitWriter writer(buffer, sizeof(buffer));
BitUtil::BitReader reader(buffer, sizeof(buffer));
writer.PutZigZagVlqInt(v);
EXPECT_EQ(buffer_expect[0], buffer[0]);
EXPECT_EQ(buffer_expect[1], buffer[1]);
EXPECT_EQ(buffer_expect[2], buffer[2]);
EXPECT_EQ(buffer_expect[3], buffer[3]);
EXPECT_EQ(buffer_expect[4], buffer[4]);
EXPECT_EQ(buffer_expect[5], buffer[5]);
EXPECT_EQ(buffer_expect[6], buffer[6]);
EXPECT_EQ(buffer_expect[7], buffer[7]);
EXPECT_EQ(buffer_expect[8], buffer[8]);
EXPECT_EQ(buffer_expect[9], buffer[9]);
int64_t result;
EXPECT_TRUE(reader.GetZigZagVlqInt(&result));
EXPECT_EQ(v, result);
}

TEST(BitStreamUtil, ZigZag64) {
uint8_t buffer_expect0[10] = {0, 0, 0, 0, 0, 0, 0, 0, 0};
uint8_t buffer_expect1[10] = {2, 0, 0, 0, 0, 0, 0, 0, 0};
uint8_t buffer_expect2[10] = {164, 19, 0, 0, 0, 0, 0, 0, 0};
uint8_t buffer_expect3[10] = {1, 0, 0, 0, 0, 0, 0, 0, 0};
uint8_t buffer_expect4[10] = {163, 19, 0, 0, 0, 0, 0, 0, 0};
uint8_t buffer_expect5[10] = {254, 255, 255, 255, 255, 255, 255, 255, 255, 1};
uint8_t buffer_expect6[10] = {253, 255, 255, 255, 255, 255, 255, 255, 255, 1};
uint8_t buffer_expect7[10] = {255, 255, 255, 255, 255, 255, 255, 255, 255, 1};
TestZigZag64(0, buffer_expect0);
TestZigZag64(1, buffer_expect1);
TestZigZag64(1234, buffer_expect2);
TestZigZag64(-1, buffer_expect3);
TestZigZag64(-1234, buffer_expect4);
TestZigZag64(std::numeric_limits<int64_t>::max(), buffer_expect5);
TestZigZag64(-std::numeric_limits<int64_t>::max(), buffer_expect6);
TestZigZag64(std::numeric_limits<int64_t>::min(), buffer_expect7);
}

TEST(BitUtil, RoundTripLittleEndianTest) {
Expand Down
Loading