Skip to content
Merged
8 changes: 7 additions & 1 deletion src/commands/cmd_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "commands/command_parser.h"
#include "error_constants.h"
#include "server/server.h"
#include "status.h"
#include "types/redis_bitmap.h"

namespace redis {
Expand Down Expand Up @@ -171,6 +172,10 @@ class CommandBitPos : public Commander {
stop_ = *parse_stop;
}

if (args.size() >= 6 && util::EqualICase(args[5], "BIT")) {
is_bit_index_ = true;
}

auto parse_arg = ParseInt<int64_t>(args[2], 10);
if (!parse_arg) {
return {Status::RedisParseErr, errValueNotInteger};
Expand All @@ -189,7 +194,7 @@ class CommandBitPos : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
int64_t pos = 0;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.BitPos(args_[1], bit_, start_, stop_, stop_given_, &pos);
auto s = bitmap_db.BitPos(args_[1], bit_, start_, stop_, stop_given_, &pos, is_bit_index_);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(pos);
Expand All @@ -201,6 +206,7 @@ class CommandBitPos : public Commander {
int64_t stop_ = -1;
bool bit_ = false;
bool stop_given_ = false;
bool is_bit_index_ = false;
};

class CommandBitOp : public Commander {
Expand Down
60 changes: 51 additions & 9 deletions src/common/bit_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,7 @@ static inline void SetBitTo(uint8_t *bits, int64_t i, bool bit_is_set) {
bits[i / 8] ^= static_cast<uint8_t>(-static_cast<uint8_t>(bit_is_set) ^ bits[i / 8]) & kBitmask[i % 8];
}

/* Return the position of the first bit set to one (if 'bit' is 1) or
* zero (if 'bit' is 0) in the bitmap starting at 's' and long 'count' bytes.
*
* The function is guaranteed to return a value >= 0 if 'bit' is 0 since if
* no zero bit is found, it returns count*8 assuming the string is zero
* padded on the right. However if 'bit' is 1 it is possible that there is
* not a single set bit in the bitmap. In this special case -1 is returned.
* */
inline int64_t RawBitpos(const uint8_t *c, int64_t count, bool bit) {
inline int64_t RawBitposBytes(const uint8_t *c, int64_t count, bool bit) {
int64_t res = 0;

if (bit) {
Expand Down Expand Up @@ -144,6 +136,56 @@ inline int64_t RawBitpos(const uint8_t *c, int64_t count, bool bit) {
return res;
}

/* Return the position of the first bit set to one (if 'bit' is 1) or
* zero (if 'bit' is 0) in the bitmap 's' starting at 'start_bit' and
* ending at 'end_bit'
*
* The function is guaranteed to return a value >= 0 if 'bit' is 0 since if
* no zero bit is found, it returns count*8 assuming the string is zero
* padded on the right. However if 'bit' is 1 it is possible that there is
* not a single set bit in the bitmap. In this special case -1 is returned.
* */
inline int64_t RawBitpos(const uint8_t *s, int64_t start_bit, int64_t end_bit, bool bit) {
int64_t start_byte = start_bit / 8;
int64_t end_byte = end_bit / 8;
int64_t start_bit_in_byte = start_bit % 8;
int64_t end_bit_in_byte = end_bit % 8;

// If the range is contained in a single byte
if (start_byte == end_byte) {
for (int64_t i = start_bit_in_byte; i <= end_bit_in_byte; i++) {
if (msb::GetBitFromByte(s[start_byte], i) == bit) {
return i + start_byte * 8;
}
}

// return count if no bit is found and bit is 0, else return -1
return bit ? -1 : (end_bit - start_bit + 1);
}

// Check the start byte
for (int64_t i = start_bit_in_byte; i < 8; i++) {
if (msb::GetBitFromByte(s[start_byte], i) == bit) {
return i + start_byte * 8;
}
}

// iterate over long bytes in the middle
int64_t res = msb::RawBitposBytes(s + start_byte + 1, end_byte - start_byte - 1, bit);
if (res != -1 && res != (end_byte - start_byte - 1) * 8) {
return res + (start_byte + 1) * 8;
}

// check the last byte
for (int64_t i = 0; i <= end_bit_in_byte; i++) {
if (msb::GetBitFromByte(s[end_byte], i) == bit) {
return i + end_byte * 8;
}
}

return bit ? -1 : (end_bit - start_bit + 1);
}

} // namespace msb

} // namespace util
64 changes: 55 additions & 9 deletions src/types/redis_bitmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ rocksdb::Status Bitmap::BitCount(const Slice &user_key, int64_t start, int64_t s
}

rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given,
int64_t *pos) {
int64_t *pos, bool is_bit_index) {
std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);

Expand All @@ -317,9 +317,13 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i

if (metadata.Type() == kRedisString) {
redis::BitmapString bitmap_string_db(storage_, namespace_);
return bitmap_string_db.BitPos(raw_value, bit, start, stop, stop_given, pos);
return bitmap_string_db.BitPos(raw_value, bit, start, stop, stop_given, pos, is_bit_index);
}
std::tie(start, stop) = BitmapString::NormalizeRange(start, stop, static_cast<int64_t>(metadata.size));

uint32_t to_bit_factor = is_bit_index ? 8 : 1;
auto size = static_cast<int64_t>(metadata.size) * static_cast<int64_t>(to_bit_factor);

std::tie(start, stop) = BitmapString::NormalizeRange(start, stop, size);
auto u_start = static_cast<uint32_t>(start);
auto u_stop = static_cast<uint32_t>(stop);
if (u_start > u_stop) {
Expand All @@ -335,11 +339,28 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i
return -1;
};

auto bit_pos_in_byte_startstop = [](char byte, bool bit, uint32_t start, uint32_t stop) -> int {
for (uint32_t i = start; i <= stop; i++) {
if (bit && (byte & (1 << i)) != 0) return (int)i; // typecast to int since the value ranges from 0 to 7
if (!bit && (byte & (1 << i)) == 0) return (int)i;
}
return -1;
};

LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
uint32_t start_index = u_start / kBitmapSegmentBytes;
uint32_t stop_index = u_stop / kBitmapSegmentBytes;
// if bit index, (Eg start = 1, stop = 35), then
// u_start = 1/8 = 0, u_stop = 35/8 = 4 (in bytes)
uint32_t start_index = (u_start / to_bit_factor) / kBitmapSegmentBytes;
uint32_t stop_index = (u_stop / to_bit_factor) / kBitmapSegmentBytes;
uint32_t start_bit_pos_in_byte = 0;
uint32_t stop_bit_pos_in_byte = 0;

if (is_bit_index) {
start_bit_pos_in_byte = u_start % 8;
stop_bit_pos_in_byte = u_stop % 8;
}
// Don't use multi get to prevent large range query, and take too much memory
// Searching bits in segments [start_index, stop_index].
for (uint32_t i = start_index; i <= stop_index; i++) {
Expand All @@ -359,17 +380,42 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i
continue;
}
size_t byte_pos_in_segment = 0;
if (i == start_index) byte_pos_in_segment = u_start % kBitmapSegmentBytes;
size_t byte_with_bit_start = -1;
size_t byte_with_bit_stop = -2;
// if bit index, (Eg start = 1, stop = 35), then
// byte_pos_in_segment should be calculated in bytes, hence divide by 8
if (i == start_index) {
byte_pos_in_segment = (u_start / to_bit_factor) % kBitmapSegmentBytes;
byte_with_bit_start = byte_pos_in_segment;
}
size_t stop_byte_in_segment = pin_value.size();
if (i == stop_index) {
DCHECK_LE(u_stop % kBitmapSegmentBytes + 1, pin_value.size());
stop_byte_in_segment = u_stop % kBitmapSegmentBytes + 1;
DCHECK_LE((u_stop / to_bit_factor) % kBitmapSegmentBytes + 1, pin_value.size());
stop_byte_in_segment = (u_stop / to_bit_factor) % kBitmapSegmentBytes + 1;
byte_with_bit_stop = stop_byte_in_segment;
}
// Invariant:
// 1. pin_value.size() <= kBitmapSegmentBytes.
// 2. If it's the last segment, metadata.size % kBitmapSegmentBytes <= pin_value.size().
for (; byte_pos_in_segment < stop_byte_in_segment; byte_pos_in_segment++) {
int bit_pos_in_byte_value = bit_pos_in_byte(pin_value[byte_pos_in_segment], bit);
int bit_pos_in_byte_value = -1;
if (is_bit_index) {
if (byte_with_bit_start == byte_with_bit_stop && byte_pos_in_segment == byte_with_bit_start) {
bit_pos_in_byte_value = bit_pos_in_byte_startstop(pin_value[byte_pos_in_segment], bit, start_bit_pos_in_byte,
stop_bit_pos_in_byte);
} else if (byte_pos_in_segment == byte_with_bit_start) {
bit_pos_in_byte_value =
bit_pos_in_byte_startstop(pin_value[byte_pos_in_segment], bit, start_bit_pos_in_byte, 7);
} else if (byte_pos_in_segment == byte_with_bit_stop) {
bit_pos_in_byte_value =
bit_pos_in_byte_startstop(pin_value[byte_pos_in_segment], bit, 0, stop_bit_pos_in_byte);
} else {
bit_pos_in_byte_value = bit_pos_in_byte(pin_value[byte_pos_in_segment], bit);
}
} else {
bit_pos_in_byte_value = bit_pos_in_byte(pin_value[byte_pos_in_segment], bit);
}

if (bit_pos_in_byte_value != -1) {
*pos = static_cast<int64_t>(i * kBitmapSegmentBits + byte_pos_in_segment * 8 + bit_pos_in_byte_value);
return rocksdb::Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion src/types/redis_bitmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class Bitmap : public Database {
rocksdb::Status GetString(const Slice &user_key, uint32_t max_btos_size, std::string *value);
rocksdb::Status SetBit(const Slice &user_key, uint32_t bit_offset, bool new_bit, bool *old_bit);
rocksdb::Status BitCount(const Slice &user_key, int64_t start, int64_t stop, bool is_bit_index, uint32_t *cnt);
rocksdb::Status BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given, int64_t *pos);
rocksdb::Status BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given, int64_t *pos,
bool is_bit_index);
rocksdb::Status BitOp(BitOpFlags op_flag, const std::string &op_name, const Slice &user_key,
const std::vector<Slice> &op_keys, int64_t *len);
rocksdb::Status Bitfield(const Slice &user_key, const std::vector<BitfieldOperation> &ops,
Expand Down
18 changes: 12 additions & 6 deletions src/types/redis_bitmap_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,24 @@ rocksdb::Status BitmapString::BitCount(const std::string &raw_value, int64_t sta
}

rocksdb::Status BitmapString::BitPos(const std::string &raw_value, bool bit, int64_t start, int64_t stop,
bool stop_given, int64_t *pos) {
bool stop_given, int64_t *pos, bool is_bit_index) {
std::string_view string_value = std::string_view{raw_value}.substr(Metadata::GetOffsetAfterExpire(raw_value[0]));
auto strlen = static_cast<int64_t>(string_value.size());
/* Convert negative and out-of-bound indexes */
std::tie(start, stop) = NormalizeRange(start, stop, strlen);

int64_t length = is_bit_index ? strlen * 8 : strlen;
std::tie(start, stop) = NormalizeRange(start, stop, length);

if (start > stop) {
*pos = -1;
} else {
int64_t bytes = stop - start + 1;
*pos = util::msb::RawBitpos(reinterpret_cast<const uint8_t *>(string_value.data()) + start, bytes, bit);
if (!is_bit_index) {
start *= 8;
stop = (stop * 8) + 7;
}

int64_t count_bits = stop - start + 1;
*pos = util::msb::RawBitpos(reinterpret_cast<const uint8_t *>(string_value.data()), start, stop, bit);

/* If we are looking for clear bits, and the user specified an exact
* range with start-end, we can't consider the right of the range as
Expand All @@ -119,11 +126,10 @@ rocksdb::Status BitmapString::BitPos(const std::string &raw_value, bool bit, int
* So if redisBitpos() returns the first bit outside the range,
* we return -1 to the caller, to mean, in the specified range there
* is not a single "0" bit. */
if (stop_given && bit == 0 && *pos == bytes * 8) {
if (stop_given && bit == 0 && *pos == count_bits) {
*pos = -1;
return rocksdb::Status::OK();
}
if (*pos != -1) *pos += start * 8; /* Adjust for the bytes we skipped. */
}
return rocksdb::Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_bitmap_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class BitmapString : public Database {
static rocksdb::Status BitCount(const std::string &raw_value, int64_t start, int64_t stop, bool is_bit_index,
uint32_t *cnt);
static rocksdb::Status BitPos(const std::string &raw_value, bool bit, int64_t start, int64_t stop, bool stop_given,
int64_t *pos);
int64_t *pos, bool is_bit_index);
rocksdb::Status Bitfield(const Slice &ns_key, std::string *raw_value, const std::vector<BitfieldOperation> &ops,
std::vector<std::optional<BitfieldValue>> *rets);
static rocksdb::Status BitfieldReadOnly(const Slice &ns_key, const std::string &raw_value,
Expand Down
Loading