Skip to content
Merged
8 changes: 7 additions & 1 deletion src/commands/cmd_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "commands/command_parser.h"
#include "error_constants.h"
#include "server/server.h"
#include "status.h"
#include "types/redis_bitmap.h"

namespace redis {
Expand Down Expand Up @@ -171,6 +172,10 @@ class CommandBitPos : public Commander {
stop_ = *parse_stop;
}

if (args.size() >= 6 && util::EqualICase(args[5], "BIT")) {
is_bit_index_ = true;
}

auto parse_arg = ParseInt<int64_t>(args[2], 10);
if (!parse_arg) {
return {Status::RedisParseErr, errValueNotInteger};
Expand All @@ -189,7 +194,7 @@ class CommandBitPos : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
int64_t pos = 0;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.BitPos(args_[1], bit_, start_, stop_, stop_given_, &pos);
auto s = bitmap_db.BitPos(args_[1], bit_, start_, stop_, stop_given_, &pos, is_bit_index_);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(pos);
Expand All @@ -201,6 +206,7 @@ class CommandBitPos : public Commander {
int64_t stop_ = -1;
bool bit_ = false;
bool stop_given_ = false;
bool is_bit_index_ = false;
};

class CommandBitOp : public Commander {
Expand Down
4 changes: 4 additions & 0 deletions src/common/bit_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ static inline void SetBitTo(uint8_t *bits, int64_t i, bool bit_is_set) {
* not a single set bit in the bitmap. In this special case -1 is returned.
* */
inline int64_t RawBitpos(const uint8_t *c, int64_t count, bool bit) {
if (count == 0) {
return -1;
}

int64_t res = 0;

if (bit) {
Expand Down
72 changes: 60 additions & 12 deletions src/types/redis_bitmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include "redis_bitmap.h"

#include <algorithm>
#include <cstdint>
#include <memory>
#include <utility>
#include <vector>

#include "common/bit_util.h"
Expand Down Expand Up @@ -303,7 +305,7 @@ rocksdb::Status Bitmap::BitCount(const Slice &user_key, int64_t start, int64_t s
}

rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given,
int64_t *pos) {
int64_t *pos, bool is_bit_index) {
std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);

Expand All @@ -317,9 +319,13 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i

if (metadata.Type() == kRedisString) {
redis::BitmapString bitmap_string_db(storage_, namespace_);
return bitmap_string_db.BitPos(raw_value, bit, start, stop, stop_given, pos);
return bitmap_string_db.BitPos(raw_value, bit, start, stop, stop_given, pos, is_bit_index);
}
std::tie(start, stop) = BitmapString::NormalizeRange(start, stop, static_cast<int64_t>(metadata.size));

uint32_t to_bit_factor = is_bit_index ? 8 : 1;
auto size = static_cast<int64_t>(metadata.size) * static_cast<int64_t>(to_bit_factor);

std::tie(start, stop) = BitmapString::NormalizeRange(start, stop, size);
auto u_start = static_cast<uint32_t>(start);
auto u_stop = static_cast<uint32_t>(stop);
if (u_start > u_stop) {
Expand All @@ -335,14 +341,39 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i
return -1;
};

auto bit_pos_in_byte_startstop = [](char byte, bool bit, uint32_t start, uint32_t stop) -> int {
for (uint32_t i = start; i <= stop; i++) {
if (bit && (byte & (1 << i)) != 0) return (int)i; // typecast to int since the value ranges from 0 to 7
if (!bit && (byte & (1 << i)) == 0) return (int)i;
}
return -1;
};

auto range_in_byte = [](uint32_t byte_start, uint32_t byte_end, uint32_t curr_byte, uint32_t start_bit,
uint32_t end_bit) -> std::pair<uint32_t, uint32_t> {
if (curr_byte == byte_start && curr_byte == byte_end) return {start_bit, end_bit};
if (curr_byte == byte_start) return {start_bit, 7};
if (curr_byte == byte_end) return {0, end_bit};
return {0, 7};
};

LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
uint32_t start_index = u_start / kBitmapSegmentBytes;
uint32_t stop_index = u_stop / kBitmapSegmentBytes;
// if bit index, (Eg start = 1, stop = 35), then
// u_start = 1/8 = 0, u_stop = 35/8 = 4 (in bytes)
uint32_t start_segment_index = (u_start / to_bit_factor) / kBitmapSegmentBytes;
uint32_t stop_segment_index = (u_stop / to_bit_factor) / kBitmapSegmentBytes;
uint32_t start_bit_pos_in_byte = 0;
uint32_t stop_bit_pos_in_byte = 0;

if (is_bit_index) {
start_bit_pos_in_byte = u_start % 8;
stop_bit_pos_in_byte = u_stop % 8;
}
// Don't use multi get to prevent large range query, and take too much memory
// Searching bits in segments [start_index, stop_index].
for (uint32_t i = start_index; i <= stop_index; i++) {
for (uint32_t i = start_segment_index; i <= stop_segment_index; i++) {
rocksdb::PinnableSlice pin_value;
std::string sub_key =
InternalKey(ns_key, std::to_string(i * kBitmapSegmentBytes), metadata.version, storage_->IsSlotIdEncoded())
Expand All @@ -359,17 +390,34 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i
continue;
}
size_t byte_pos_in_segment = 0;
if (i == start_index) byte_pos_in_segment = u_start % kBitmapSegmentBytes;
size_t byte_with_bit_start = -1;
size_t byte_with_bit_stop = -2;
// if bit index, (Eg start = 1, stop = 35), then
// byte_pos_in_segment should be calculated in bytes, hence divide by 8
if (i == start_segment_index) {
byte_pos_in_segment = (u_start / to_bit_factor) % kBitmapSegmentBytes;
byte_with_bit_start = byte_pos_in_segment;
}
size_t stop_byte_in_segment = pin_value.size();
if (i == stop_index) {
DCHECK_LE(u_stop % kBitmapSegmentBytes + 1, pin_value.size());
stop_byte_in_segment = u_stop % kBitmapSegmentBytes + 1;
if (i == stop_segment_index) {
DCHECK_LE((u_stop / to_bit_factor) % kBitmapSegmentBytes + 1, pin_value.size());
stop_byte_in_segment = (u_stop / to_bit_factor) % kBitmapSegmentBytes + 1;
byte_with_bit_stop = stop_byte_in_segment;
}
// Invariant:
// 1. pin_value.size() <= kBitmapSegmentBytes.
// 2. If it's the last segment, metadata.size % kBitmapSegmentBytes <= pin_value.size().
for (; byte_pos_in_segment < stop_byte_in_segment; byte_pos_in_segment++) {
int bit_pos_in_byte_value = bit_pos_in_byte(pin_value[byte_pos_in_segment], bit);
int bit_pos_in_byte_value = -1;
if (is_bit_index) {
uint32_t start_bit = 0, stop_bit = 7;
std::tie(start_bit, stop_bit) = range_in_byte(byte_with_bit_start, byte_with_bit_stop, byte_pos_in_segment,
start_bit_pos_in_byte, stop_bit_pos_in_byte);
bit_pos_in_byte_value = bit_pos_in_byte_startstop(pin_value[byte_pos_in_segment], bit, start_bit, stop_bit);
} else {
bit_pos_in_byte_value = bit_pos_in_byte(pin_value[byte_pos_in_segment], bit);
}

if (bit_pos_in_byte_value != -1) {
*pos = static_cast<int64_t>(i * kBitmapSegmentBits + byte_pos_in_segment * 8 + bit_pos_in_byte_value);
return rocksdb::Status::OK();
Expand All @@ -382,7 +430,7 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i
// 1. If it's the last segment, we've done searching in the above loop.
// 2. If it's not the last segment, we can check if the segment is all 0.
if (pin_value.size() < kBitmapSegmentBytes) {
if (i == stop_index) {
if (i == stop_segment_index) {
continue;
}
*pos = static_cast<int64_t>(i * kBitmapSegmentBits + pin_value.size() * 8);
Expand Down
3 changes: 2 additions & 1 deletion src/types/redis_bitmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class Bitmap : public Database {
rocksdb::Status GetString(const Slice &user_key, uint32_t max_btos_size, std::string *value);
rocksdb::Status SetBit(const Slice &user_key, uint32_t bit_offset, bool new_bit, bool *old_bit);
rocksdb::Status BitCount(const Slice &user_key, int64_t start, int64_t stop, bool is_bit_index, uint32_t *cnt);
rocksdb::Status BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given, int64_t *pos);
rocksdb::Status BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given, int64_t *pos,
bool is_bit_index);
rocksdb::Status BitOp(BitOpFlags op_flag, const std::string &op_name, const Slice &user_key,
const std::vector<Slice> &op_keys, int64_t *len);
rocksdb::Status Bitfield(const Slice &user_key, const std::vector<BitfieldOperation> &ops,
Expand Down
79 changes: 64 additions & 15 deletions src/types/redis_bitmap_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,31 +100,80 @@ rocksdb::Status BitmapString::BitCount(const std::string &raw_value, int64_t sta
}

rocksdb::Status BitmapString::BitPos(const std::string &raw_value, bool bit, int64_t start, int64_t stop,
bool stop_given, int64_t *pos) {
bool stop_given, int64_t *pos, bool is_bit_index) {
std::string_view string_value = std::string_view{raw_value}.substr(Metadata::GetOffsetAfterExpire(raw_value[0]));
auto strlen = static_cast<int64_t>(string_value.size());
/* Convert negative and out-of-bound indexes */
std::tie(start, stop) = NormalizeRange(start, stop, strlen);

int64_t length = is_bit_index ? strlen * 8 : strlen;
std::tie(start, stop) = NormalizeRange(start, stop, length);

if (start > stop) {
*pos = -1;
} else {
int64_t bytes = stop - start + 1;
*pos = util::msb::RawBitpos(reinterpret_cast<const uint8_t *>(string_value.data()) + start, bytes, bit);

/* If we are looking for clear bits, and the user specified an exact
* range with start-end, we can't consider the right of the range as
* zero padded (as we do when no explicit end is given).
*
* So if redisBitpos() returns the first bit outside the range,
* we return -1 to the caller, to mean, in the specified range there
* is not a single "0" bit. */
if (stop_given && bit == 0 && *pos == bytes * 8) {
return rocksdb::Status::OK();
}

int64_t byte_start = is_bit_index ? start / 8 : start;
int64_t byte_stop = is_bit_index ? stop / 8 : stop;
int64_t bit_in_start_byte = is_bit_index ? start % 8 : 0;
int64_t bit_in_stop_byte = is_bit_index ? stop % 8 : 7;
int64_t bytes_cnt = byte_stop - byte_start + 1;

auto bit_pos_in_byte_startstop = [](char byte, bool bit, uint32_t start, uint32_t stop) -> int {
for (uint32_t i = start; i <= stop; i++) {
if (util::msb::GetBitFromByte(byte, i) == bit) {
return (int)i;
}
}
return -1;
};

// if the bit start and bit end are in the same byte, we can process it manually
if (is_bit_index && byte_start == byte_stop) {
int res = bit_pos_in_byte_startstop(string_value[byte_start], bit, bit_in_start_byte, bit_in_stop_byte);
if (res != -1) {
*pos = res + byte_start * 8;
return rocksdb::Status::OK();
}
*pos = -1;
return rocksdb::Status::OK();
}

if (is_bit_index && bit_in_start_byte != 0) {
// process first byte
int res = bit_pos_in_byte_startstop(string_value[byte_start], bit, bit_in_start_byte, 7);
if (res != -1) {
*pos = res + byte_start * 8;
return rocksdb::Status::OK();
}

byte_start++;
bytes_cnt--;
}

*pos = util::msb::RawBitpos(reinterpret_cast<const uint8_t *>(string_value.data()) + byte_start, bytes_cnt, bit);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check byte_stop here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, my bad, it handle the case in is_bit_index

if (is_bit_index && *pos != -1 && *pos != bytes_cnt * 8) {
// if the pos is more than stop bit, then it is not in the range
if (*pos > stop) {
*pos = -1;
return rocksdb::Status::OK();
}
if (*pos != -1) *pos += start * 8; /* Adjust for the bytes we skipped. */
}

/* If we are looking for clear bits, and the user specified an exact
* range with start-end, we tcan' consider the right of the range as
* zero padded (as we do when no explicit end is given).
*
* So if redisBitpos() returns the first bit outside the range,
* we return -1 to the caller, to mean, in the specified range there
* is not a single "0" bit. */
if (stop_given && bit == 0 && *pos == bytes_cnt * 8) {
*pos = -1;
return rocksdb::Status::OK();
}
if (*pos != -1) *pos += byte_start * 8; /* Adjust for the bytes we skipped. */

return rocksdb::Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_bitmap_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class BitmapString : public Database {
static rocksdb::Status BitCount(const std::string &raw_value, int64_t start, int64_t stop, bool is_bit_index,
uint32_t *cnt);
static rocksdb::Status BitPos(const std::string &raw_value, bool bit, int64_t start, int64_t stop, bool stop_given,
int64_t *pos);
int64_t *pos, bool is_bit_index);
rocksdb::Status Bitfield(const Slice &ns_key, std::string *raw_value, const std::vector<BitfieldOperation> &ops,
std::vector<std::optional<BitfieldValue>> *rets);
static rocksdb::Status BitfieldReadOnly(const Slice &ns_key, const std::string &raw_value,
Expand Down
22 changes: 11 additions & 11 deletions tests/cppunit/types/bitmap_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ TEST_P(RedisBitmapTest, BitPosClearBit) {
///
/// String will set a empty string value when initializing, so, when first
/// querying, it should return -1.
bitmap_->BitPos(key_, false, 0, -1, /*stop_given=*/false, &pos);
bitmap_->BitPos(key_, false, 0, -1, /*stop_given=*/false, &pos, /*bit_index=*/false);
if (i == 0 && !use_bitmap) {
EXPECT_EQ(pos, -1);
} else {
Expand All @@ -201,7 +201,7 @@ TEST_P(RedisBitmapTest, BitPosSetBit) {
int64_t pos = 0;
int start_indexes[] = {0, 1, 124, 1025, 1027, 3 * 1024 + 1};
for (size_t i = 0; i < sizeof(start_indexes) / sizeof(start_indexes[0]); i++) {
bitmap_->BitPos(key_, true, start_indexes[i], -1, true, &pos);
bitmap_->BitPos(key_, true, start_indexes[i], -1, true, &pos, /*bit_index=*/false);
EXPECT_EQ(pos, offsets[i]);
}
auto s = bitmap_->Del(key_);
Expand All @@ -215,19 +215,19 @@ TEST_P(RedisBitmapTest, BitPosNegative) {
}
int64_t pos = 0;
// First bit is negative
bitmap_->BitPos(key_, false, 0, -1, true, &pos);
bitmap_->BitPos(key_, false, 0, -1, true, &pos, /*bit_index=*/false);
EXPECT_EQ(0, pos);
// 8 * 1024 - 1 bit is positive
bitmap_->BitPos(key_, true, 0, -1, true, &pos);
bitmap_->BitPos(key_, true, 0, -1, true, &pos, /*bit_index=*/false);
EXPECT_EQ(8 * 1024 - 1, pos);
// First bit in 1023 byte is negative
bitmap_->BitPos(key_, false, -1, -1, true, &pos);
bitmap_->BitPos(key_, false, -1, -1, true, &pos, /*bit_index=*/false);
EXPECT_EQ(8 * 1023, pos);
// Last Bit in 1023 byte is positive
bitmap_->BitPos(key_, true, -1, -1, true, &pos);
bitmap_->BitPos(key_, true, -1, -1, true, &pos, /*bit_index=*/false);
EXPECT_EQ(8 * 1024 - 1, pos);
// Large negative number will be normalized.
bitmap_->BitPos(key_, false, -10000, -10000, true, &pos);
bitmap_->BitPos(key_, false, -10000, -10000, true, &pos, /*bit_index=*/false);
EXPECT_EQ(0, pos);

auto s = bitmap_->Del(key_);
Expand All @@ -242,9 +242,9 @@ TEST_P(RedisBitmapTest, BitPosStopGiven) {
EXPECT_FALSE(bit);
}
int64_t pos = 0;
bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/true, &pos);
bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/true, &pos, /*bit_index=*/false);
EXPECT_EQ(-1, pos);
bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/false, &pos);
bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/false, &pos, /*bit_index=*/false);
EXPECT_EQ(8, pos);

// Set a bit at 8 not affect that
Expand All @@ -253,9 +253,9 @@ TEST_P(RedisBitmapTest, BitPosStopGiven) {
bitmap_->SetBit(key_, 8, true, &bit);
EXPECT_FALSE(bit);
}
bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/true, &pos);
bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/true, &pos, /*bit_index=*/false);
EXPECT_EQ(-1, pos);
bitmap_->BitPos(key_, false, 0, 1, /*stop_given=*/false, &pos);
bitmap_->BitPos(key_, false, 0, 1, /*stop_given=*/false, &pos, /*bit_index=*/false);
EXPECT_EQ(9, pos);

auto s = bitmap_->Del(key_);
Expand Down
Loading