Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 110 additions & 124 deletions arrow-buffer/src/buffer/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use crate::bit_chunk_iterator::BitChunks;
use crate::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
use crate::bit_iterator::{BitIndexIterator, BitIndexU32Iterator, BitIterator, BitSliceIterator};
use crate::{
BooleanBufferBuilder, Buffer, MutableBuffer, bit_util, buffer_bin_and, buffer_bin_or,
buffer_bin_xor, buffer_unary_not,
};
use crate::bit_util::read_u64;
use crate::{BooleanBufferBuilder, Buffer, MutableBuffer, bit_util};

use std::ops::{BitAnd, BitOr, BitXor, Not};

Expand Down Expand Up @@ -73,7 +71,7 @@ use std::ops::{BitAnd, BitOr, BitXor, Not};
/// [`NullBuffer`]: crate::NullBuffer
#[derive(Debug, Clone, Eq)]
pub struct BooleanBuffer {
/// Underlying buffer (byte aligned)
/// Underlying buffer
buffer: Buffer,
/// Offset in bits (not bytes)
bit_offset: usize,
Expand Down Expand Up @@ -151,7 +149,10 @@ impl BooleanBuffer {
/// let result = BooleanBuffer::from_bits(&input, 4, 12);
/// assert_eq!(result.values(), &[0b10101100u8, 0b00001011u8]);
pub fn from_bits(src: impl AsRef<[u8]>, offset_in_bits: usize, len_in_bits: usize) -> Self {
Self::from_bitwise_unary_op(src, offset_in_bits, len_in_bits, |a| a)
let chunks = BitChunks::new(src.as_ref(), offset_in_bits, len_in_bits);
let iter = chunks.iter_padded();
let buffer = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
BooleanBuffer::new(buffer.into(), 0, len_in_bits)
}

/// Create a new [`BooleanBuffer`] by applying the bitwise operation to `op`
Expand All @@ -166,7 +167,7 @@ impl BooleanBuffer {
/// on the relevant bits; the input `u64` may contain irrelevant bits
/// and may be processed differently on different endian architectures.
/// * `op` may be called with input bits outside the requested range
/// * The output always has zero offset
/// * The output may have a non-zero bit offset IF the source buffer is not 64-bit aligned.
///
/// # See Also
/// - [`BooleanBuffer::from_bitwise_binary_op`] to create a new buffer from a binary operation
Expand All @@ -180,7 +181,8 @@ impl BooleanBuffer {
/// let result = BooleanBuffer::from_bitwise_unary_op(
/// &input, 0, 12, |a| !a
/// );
/// assert_eq!(result.values(), &[0b00110011u8, 0b11110101u8]);
/// // Values are padded
/// assert_eq!(result.values(), &[0b00110011u8, 0b11110101u8, 255, 255, 255, 255, 255, 255]);
/// ```
pub fn from_bitwise_unary_op<F>(
src: impl AsRef<[u8]>,
Expand All @@ -191,67 +193,21 @@ impl BooleanBuffer {
where
F: FnMut(u64) -> u64,
{
// try fast path for aligned input
if offset_in_bits & 0x7 == 0 {
// align to byte boundary
let aligned = &src.as_ref()[offset_in_bits / 8..];
if let Some(result) =
Self::try_from_aligned_bitwise_unary_op(aligned, len_in_bits, &mut op)
{
return result;
}
}

let chunks = BitChunks::new(src.as_ref(), offset_in_bits, len_in_bits);
let mut result = MutableBuffer::with_capacity(chunks.num_u64s() * 8);
for chunk in chunks.iter() {
// SAFETY: reserved enough capacity above, (exactly num_u64s()
// items) and we assume `BitChunks` correctly reports upper bound
unsafe {
result.push_unchecked(op(chunk));
}
}
if chunks.remainder_len() > 0 {
debug_assert!(result.capacity() >= result.len() + 8); // should not reallocate
// SAFETY: reserved enough capacity above, (exactly num_u64s()
// items) and we assume `BitChunks` correctly reports upper bound
unsafe {
result.push_unchecked(op(chunks.remainder_bits()));
}
// Just pushed one u64, which may have trailing zeros
result.truncate(chunks.num_bytes());
}

BooleanBuffer {
buffer: Buffer::from(result),
bit_offset: 0,
bit_len: len_in_bits,
}
}

/// Fast path for [`Self::from_bitwise_unary_op`] when input is aligned to
/// 8-byte (64-bit) boundaries
///
/// Returns None if the fast path cannot be taken
fn try_from_aligned_bitwise_unary_op<F>(
src: &[u8],
len_in_bits: usize,
op: &mut F,
) -> Option<Self>
where
F: FnMut(u64) -> u64,
{
// Safety: all valid bytes are valid u64s
let (prefix, aligned_u6us, suffix) = unsafe { src.align_to::<u64>() };
if !(prefix.is_empty() && suffix.is_empty()) {
// Couldn't make this case any faster than the default path, see
// https://github.com/apache/arrow-rs/pull/8996/changes#r2620022082
return None;
}
// the buffer is word (64 bit) aligned, so use optimized Vec code.
let result_u64s: Vec<u64> = aligned_u6us.iter().map(|l| op(*l)).collect();
let buffer = Buffer::from(result_u64s);
Some(BooleanBuffer::new(buffer, 0, len_in_bits))
let end = offset_in_bits + len_in_bits;
let start_bit = offset_in_bits % 8;
// align to byte boundaries
let aligned = &src.as_ref()[offset_in_bits / 8..bit_util::ceil(end, 8)];
// Use unaligned code path, handle remainder bytes
let chunks = aligned.chunks_exact(8);
let remainder = chunks.remainder();
let iter = chunks.map(|c| u64::from_le_bytes(c.try_into().unwrap()));
let vec_u64s: Vec<u64> = if remainder.is_empty() {
iter.map(&mut op).collect()
} else {
iter.chain(Some(read_u64(remainder))).map(&mut op).collect()
};

return BooleanBuffer::new(Buffer::from(vec_u64s), start_bit, len_in_bits);
}

/// Create a new [`BooleanBuffer`] by applying the bitwise operation `op` to
Expand All @@ -276,7 +232,8 @@ impl BooleanBuffer {
/// let result = BooleanBuffer::from_bitwise_binary_op(
/// &left, 0, &right, 0, 12, |a, b| a & b
/// );
/// assert_eq!(result.inner().as_slice(), &[0b10001000u8, 0b00001000u8]);
/// // Note: the output is padded to the next u64 boundary
/// assert_eq!(result.inner().as_slice(), &[0b10001000u8, 0b00001000u8, 0, 0, 0, 0, 0, 0]);
/// ```
///
/// # Example: Create new [`BooleanBuffer`] from bitwise `OR` of two byte slices
Expand All @@ -288,7 +245,8 @@ impl BooleanBuffer {
/// let result = BooleanBuffer::from_bitwise_binary_op(
/// &left, 4, &right, 0, 12, |a, b| a | b
/// );
/// assert_eq!(result.inner().as_slice(), &[0b10101110u8, 0b00001111u8]);
/// // Note: the output is padded to the next u64 boundary
/// assert_eq!(result.inner().as_slice(), &[0b10101110u8, 0b00001111u8, 0, 0, 0, 0, 0, 0]);
/// ```
pub fn from_bitwise_binary_op<F>(
left: impl AsRef<[u8]>,
Expand All @@ -303,6 +261,15 @@ impl BooleanBuffer {
{
let left = left.as_ref();
let right = right.as_ref();

if left.len() < len_in_bits / 8 {
panic!("The left buffer is too small for the specified length");
}

if right.len() < len_in_bits / 8 {
panic!("The right buffer is too small for the specified length");
}

// try fast path for aligned input
// If the underlying buffers are aligned to u64 we can apply the operation directly on the u64 slices
// to improve performance.
Expand Down Expand Up @@ -336,28 +303,60 @@ impl BooleanBuffer {
}
}
}

if left_offset_in_bits == right_offset_in_bits {
// is aligned to byte boundary
if left_offset_in_bits & 0x7 == 0 {
let left = &left[left_offset_in_bits / 8..];
let right = &right[right_offset_in_bits / 8..];

let left_chunks = BitChunks::new(left, 0, len_in_bits);
let right_chunks = BitChunks::new(right, 0, len_in_bits);
let mut result = Vec::with_capacity(bit_util::ceil(len_in_bits, 64));

let l_iter = left
.chunks_exact(8)
.map(|c| u64::from_le_bytes(c.try_into().unwrap()));
let r_iter = right
.chunks_exact(8)
.map(|c| u64::from_le_bytes(c.try_into().unwrap()));

result.extend(l_iter.zip(r_iter).map(|(l, r)| op(l, r)));

if left_chunks.remainder_len() > 0 {
result.push(op(
left_chunks.remainder_bits(),
right_chunks.remainder_bits(),
));
}
return BooleanBuffer::new(Buffer::from(result), 0, len_in_bits);
}

// both buffers have the same offset, we can use UnalignedBitChunk for both
let left_chunks = UnalignedBitChunk::new(left, left_offset_in_bits, len_in_bits);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we actually don't have to use this part (use the byte aligned one above and set the correct offset).

let right_chunks = UnalignedBitChunk::new(right, right_offset_in_bits, len_in_bits);

let chunks = left_chunks
.zip(&right_chunks)
.map(|(left, right)| op(left, right));
// Soundness: `UnalignedBitChunk` is a `BitChunks` trusted length iterator which
// correctly reports its upper bound
let buffer = unsafe { MutableBuffer::from_trusted_len_iter(chunks) };

return BooleanBuffer::new(buffer.into(), left_chunks.lead_padding(), len_in_bits);
}

let left_chunks = BitChunks::new(left, left_offset_in_bits, len_in_bits);
let right_chunks = BitChunks::new(right, right_offset_in_bits, len_in_bits);

let chunks = left_chunks
.iter()
.zip(right_chunks.iter())
.zip_padded(&right_chunks)
.map(|(left, right)| op(left, right));
// Soundness: `BitChunks` is a `BitChunks` trusted length iterator which
// correctly reports its upper bound
let mut buffer = unsafe { MutableBuffer::from_trusted_len_iter(chunks) };

let remainder_bytes = bit_util::ceil(left_chunks.remainder_len(), 8);
let rem = op(left_chunks.remainder_bits(), right_chunks.remainder_bits());
// we are counting its starting from the least significant bit, to to_le_bytes should be correct
let rem = &rem.to_le_bytes()[0..remainder_bytes];
buffer.extend_from_slice(rem);
let buffer = unsafe { MutableBuffer::from_trusted_len_iter(chunks) };

BooleanBuffer {
buffer: Buffer::from(buffer),
bit_offset: 0,
bit_len: len_in_bits,
}
BooleanBuffer::new(buffer.into(), 0, len_in_bits)
}

/// Returns the number of set bits in this buffer
Expand Down Expand Up @@ -506,11 +505,7 @@ impl Not for &BooleanBuffer {
type Output = BooleanBuffer;

fn not(self) -> Self::Output {
BooleanBuffer {
buffer: buffer_unary_not(&self.buffer, self.bit_offset, self.bit_len),
bit_offset: 0,
bit_len: self.bit_len,
}
BooleanBuffer::from_bitwise_unary_op(&self.buffer, self.bit_offset, self.bit_len, |a| !a)
}
}

Expand All @@ -519,17 +514,14 @@ impl BitAnd<&BooleanBuffer> for &BooleanBuffer {

fn bitand(self, rhs: &BooleanBuffer) -> Self::Output {
assert_eq!(self.bit_len, rhs.bit_len);
BooleanBuffer {
buffer: buffer_bin_and(
&self.buffer,
self.bit_offset,
&rhs.buffer,
rhs.bit_offset,
self.bit_len,
),
bit_offset: 0,
bit_len: self.bit_len,
}
BooleanBuffer::from_bitwise_binary_op(
&self.buffer,
self.bit_offset,
&rhs.buffer,
rhs.bit_offset,
self.bit_len,
|a, b| a & b,
)
}
}

Expand All @@ -538,17 +530,14 @@ impl BitOr<&BooleanBuffer> for &BooleanBuffer {

fn bitor(self, rhs: &BooleanBuffer) -> Self::Output {
assert_eq!(self.bit_len, rhs.bit_len);
BooleanBuffer {
buffer: buffer_bin_or(
&self.buffer,
self.bit_offset,
&rhs.buffer,
rhs.bit_offset,
self.bit_len,
),
bit_offset: 0,
bit_len: self.bit_len,
}
BooleanBuffer::from_bitwise_binary_op(
&self.buffer,
self.bit_offset,
&rhs.buffer,
rhs.bit_offset,
self.bit_len,
|a, b| a | b,
)
}
}

Expand All @@ -557,17 +546,14 @@ impl BitXor<&BooleanBuffer> for &BooleanBuffer {

fn bitxor(self, rhs: &BooleanBuffer) -> Self::Output {
assert_eq!(self.bit_len, rhs.bit_len);
BooleanBuffer {
buffer: buffer_bin_xor(
&self.buffer,
self.bit_offset,
&rhs.buffer,
rhs.bit_offset,
self.bit_len,
),
bit_offset: 0,
bit_len: self.bit_len,
}
BooleanBuffer::from_bitwise_binary_op(
&self.buffer,
self.bit_offset,
&rhs.buffer,
rhs.bit_offset,
self.bit_len,
|a, b| a ^ b,
)
}
}

Expand Down
5 changes: 4 additions & 1 deletion arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,10 @@ mod tests {

let assert_preserved = |offset: usize, len: usize| {
let new_buf = buf.bit_slice(offset, len);
assert_eq!(new_buf.len(), bit_util::ceil(len, 8));
assert_eq!(
new_buf.len(),
bit_util::round_upto_multiple_of_64(bit_util::ceil(len, 8))
);

// if the offset is not byte-aligned, we have to create a deep copy to a new buffer
// (since the `offset` value inside a Buffer is byte-granular, not bit-granular), so
Expand Down
15 changes: 8 additions & 7 deletions arrow-buffer/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,15 +872,16 @@ impl MutableBuffer {

let mut buffer = MutableBuffer::new(len);

let mut dst = buffer.data.as_ptr();
for item in iterator {
let mut dst = buffer.data.as_ptr() as *mut T;
iterator.for_each(|item| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for_each uses fold so can use a more efficient implementation if it is available.

// note how there is no reserve here (compared with `extend_from_iter`)
let src = item.to_byte_slice().as_ptr();
unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
dst = unsafe { dst.add(item_size) };
}
unsafe {
std::ptr::write(dst, item);
dst = dst.add(1);
}
});
assert_eq!(
unsafe { dst.offset_from(buffer.data.as_ptr()) } as usize,
unsafe { (dst as *mut u8).offset_from(buffer.data.as_ptr()) } as usize,
len,
"Trusted iterator length was not accurately reported"
);
Expand Down
Loading
Loading